Skip to main content

reth_transaction_pool/pool/
mod.rs

1//! Transaction Pool internals.
2//!
3//! Incoming transactions are validated before they enter the pool first. The validation outcome can
4//! have 3 states:
5//!
6//!  1. Transaction can _never_ be valid
7//!  2. Transaction is _currently_ valid
8//!  3. Transaction is _currently_ invalid, but could potentially become valid in the future
9//!
10//! However, (2.) and (3.) of a transaction can only be determined on the basis of the current
11//! state, whereas (1.) holds indefinitely. This means once the state changes (2.) and (3.) the
12//! state of a transaction needs to be reevaluated again.
13//!
14//! The transaction pool is responsible for storing new, valid transactions and providing the next
15//! best transactions sorted by their priority. Where priority is determined by the transaction's
16//! score ([`TransactionOrdering`]).
17//!
18//! Furthermore, the following characteristics fall under (3.):
19//!
20//!  a) Nonce of a transaction is higher than the expected nonce for the next transaction of its
21//! sender. A distinction is made here whether multiple transactions from the same sender have
22//! gapless nonce increments.
23//!
24//!  a)(1) If _no_ transaction is missing in a chain of multiple
25//! transactions from the same sender (all nonce in row), all of them can in principle be executed
26//! on the current state one after the other.
27//!
28//!  a)(2) If there's a nonce gap, then all
29//! transactions after the missing transaction are blocked until the missing transaction arrives.
30//!
31//!  b) Transaction does not meet the dynamic fee cap requirement introduced by EIP-1559: The
32//! fee cap of the transaction needs to be no less than the base fee of block.
33//!
34//!
35//! In essence the transaction pool is made of three separate sub-pools:
36//!
37//!  - Pending Pool: Contains all transactions that are valid on the current state and satisfy (3.
38//!    a)(1): _No_ nonce gaps. A _pending_ transaction is considered _ready_ when it has the lowest
39//!    nonce of all transactions from the same sender. Once a _ready_ transaction with nonce `n` has
40//!    been executed, the next highest transaction from the same sender `n + 1` becomes ready.
41//!
42//!  - Queued Pool: Contains all transactions that are currently blocked by missing transactions:
43//!    (3. a)(2): _With_ nonce gaps or due to lack of funds.
44//!
45//!  - Basefee Pool: To account for the dynamic base fee requirement (3. b) which could render an
46//!    EIP-1559 and all subsequent transactions of the sender currently invalid.
47//!
48//! The classification of transactions is always dependent on the current state that is changed as
49//! soon as a new block is mined. Once a new block is mined, the account changeset must be applied
50//! to the transaction pool.
51//!
52//!
53//! Depending on the use case, consumers of the [`TransactionPool`](crate::traits::TransactionPool)
54//! are interested in (2.) and/or (3.).
55
56//! A generic [`TransactionPool`](crate::traits::TransactionPool) that only handles transactions.
57//!
58//! This Pool maintains two separate sub-pools for (2.) and (3.)
59//!
60//! ## Terminology
61//!
62//!  - _Pending_: pending transactions are transactions that fall under (2.). These transactions can
63//!    currently be executed and are stored in the pending sub-pool
64//!  - _Queued_: queued transactions are transactions that fall under category (3.). Those
65//!    transactions are _currently_ waiting for state changes that eventually move them into
66//!    category (2.) and become pending.
67
68use crate::{
69    blobstore::BlobStore,
70    error::{PoolError, PoolErrorKind, PoolResult},
71    identifier::{SenderId, SenderIdentifiers, TransactionId},
72    metrics::BlobStoreMetrics,
73    pool::{
74        listener::{
75            BlobTransactionSidecarListener, PendingTransactionHashListener, PoolEventBroadcast,
76            TransactionListener,
77        },
78        state::SubPool,
79        txpool::{SenderInfo, TxPool},
80        update::UpdateOutcome,
81    },
82    traits::{
83        AllPoolTransactions, BestTransactionsAttributes, BlockInfo, GetPooledTransactionLimit,
84        NewBlobSidecar, PoolSize, PoolTransaction, PropagatedTransactions, TransactionOrigin,
85    },
86    validate::{TransactionValidationOutcome, ValidPoolTransaction, ValidTransaction},
87    CanonicalStateUpdate, EthPoolTransaction, PoolConfig, TransactionOrdering,
88    TransactionValidator,
89};
90
91use alloy_primitives::{
92    map::{AddressSet, HashSet},
93    Address, TxHash, B256,
94};
95use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
96use reth_eth_wire_types::HandleMempoolData;
97use reth_execution_types::ChangedAccount;
98
99use alloy_eips::{eip7594::BlobTransactionSidecarVariant, Typed2718};
100use reth_primitives_traits::Recovered;
101use rustc_hash::FxHashMap;
102use std::{
103    fmt,
104    sync::{
105        atomic::{AtomicBool, Ordering},
106        Arc,
107    },
108    time::Instant,
109};
110use tokio::sync::mpsc;
111use tracing::{debug, trace, warn};
112mod events;
113pub use best::{BestTransactionFilter, BestTransactionsWithPrioritizedSenders};
114pub use blob::{blob_tx_priority, fee_delta, BlobOrd, BlobTransactions};
115pub use events::{FullTransactionEvent, NewTransactionEvent, TransactionEvent};
116pub use listener::{AllTransactionsEvents, TransactionEvents, TransactionListenerKind};
117pub use parked::{BasefeeOrd, ParkedOrd, ParkedPool, QueuedOrd};
118pub use pending::PendingPool;
119
120mod best;
121pub use best::BestTransactions;
122
123mod blob;
124pub mod listener;
125mod parked;
126pub mod pending;
127pub mod size;
128pub(crate) mod state;
129pub mod txpool;
130mod update;
131
132/// Bound on number of pending transactions from `reth_network::TransactionsManager` to buffer.
133pub const PENDING_TX_LISTENER_BUFFER_SIZE: usize = 2048;
134/// Bound on number of new transactions from `reth_network::TransactionsManager` to buffer.
135pub const NEW_TX_LISTENER_BUFFER_SIZE: usize = 1024;
136
137const BLOB_SIDECAR_LISTENER_BUFFER_SIZE: usize = 512;
138
139/// Transaction pool internals.
140pub struct PoolInner<V, T, S>
141where
142    T: TransactionOrdering,
143{
144    /// Internal mapping of addresses to plain ints.
145    identifiers: RwLock<SenderIdentifiers>,
146    /// Transaction validator.
147    validator: V,
148    /// Storage for blob transactions
149    blob_store: S,
150    /// The internal pool that manages all transactions.
151    pool: RwLock<TxPool<T>>,
152    /// Pool settings.
153    config: PoolConfig,
154    /// Manages listeners for transaction state change events.
155    event_listener: RwLock<PoolEventBroadcast<T::Transaction>>,
156    /// Tracks whether any event listeners have ever been installed.
157    has_event_listeners: AtomicBool,
158    /// Listeners for new _full_ pending transactions.
159    pending_transaction_listener: RwLock<Vec<PendingTransactionHashListener>>,
160    /// Listeners for new transactions added to the pool.
161    transaction_listener: RwLock<Vec<TransactionListener<T::Transaction>>>,
162    /// Listener for new blob transaction sidecars added to the pool.
163    blob_transaction_sidecar_listener: Mutex<Vec<BlobTransactionSidecarListener>>,
164    /// Metrics for the blob store
165    blob_store_metrics: BlobStoreMetrics,
166}
167
168// === impl PoolInner ===
169
170impl<V, T, S> PoolInner<V, T, S>
171where
172    V: TransactionValidator,
173    T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
174    S: BlobStore,
175{
176    /// Create a new transaction pool instance.
177    pub fn new(validator: V, ordering: T, blob_store: S, config: PoolConfig) -> Self {
178        Self {
179            identifiers: Default::default(),
180            validator,
181            event_listener: Default::default(),
182            has_event_listeners: AtomicBool::new(false),
183            pool: RwLock::new(TxPool::new(ordering, config.clone())),
184            pending_transaction_listener: Default::default(),
185            transaction_listener: Default::default(),
186            blob_transaction_sidecar_listener: Default::default(),
187            config,
188            blob_store,
189            blob_store_metrics: Default::default(),
190        }
191    }
192
193    /// Returns the configured blob store.
194    pub const fn blob_store(&self) -> &S {
195        &self.blob_store
196    }
197
198    /// Returns stats about the size of the pool.
199    pub fn size(&self) -> PoolSize {
200        self.get_pool_data().size()
201    }
202
203    /// Returns the currently tracked block
204    pub fn block_info(&self) -> BlockInfo {
205        self.get_pool_data().block_info()
206    }
207    /// Sets the currently tracked block.
208    ///
209    /// This will also notify subscribers about any transactions that were promoted to the pending
210    /// pool due to fee changes.
211    pub fn set_block_info(&self, info: BlockInfo) {
212        let outcome = self.pool.write().set_block_info(info);
213
214        // Notify subscribers about promoted transactions due to fee changes
215        self.notify_on_transaction_updates(outcome.promoted, outcome.discarded);
216    }
217
218    /// Returns the internal [`SenderId`] for this address, allocating a new mapping when the
219    /// address is first observed.
220    ///
221    /// This must only be used on paths that intentionally begin tracking a sender, such as
222    /// transaction insertion. Read-only lookups should prefer [`Self::sender_id`] to avoid
223    /// growing the sender-id map for unknown addresses.
224    pub fn get_sender_id(&self, addr: Address) -> SenderId {
225        self.identifiers.write().sender_id_or_create(addr)
226    }
227
228    /// Returns the internal [`SenderId`] for this address if it is already tracked.
229    ///
230    /// Unlike [`Self::get_sender_id`], this never allocates a new sender mapping and is therefore
231    /// suitable for read-only queries or best-effort cleanup on unknown addresses.
232    pub fn sender_id(&self, addr: &Address) -> Option<SenderId> {
233        self.identifiers.read().sender_id(addr)
234    }
235
236    /// Returns the internal [`SenderId`]s for the given addresses.
237    pub fn get_sender_ids(&self, addrs: impl IntoIterator<Item = Address>) -> Vec<SenderId> {
238        self.identifiers.write().sender_ids_or_create(addrs)
239    }
240
241    /// Returns all senders in the pool
242    pub fn unique_senders(&self) -> AddressSet {
243        self.get_pool_data().unique_senders()
244    }
245
246    /// Converts the changed accounts to a map of sender ids to sender info (internal identifier
247    /// used for __tracked__ accounts)
248    fn changed_senders(
249        &self,
250        accs: impl Iterator<Item = ChangedAccount>,
251    ) -> FxHashMap<SenderId, SenderInfo> {
252        let identifiers = self.identifiers.read();
253        accs.into_iter()
254            .filter_map(|acc| {
255                let ChangedAccount { address, nonce, balance } = acc;
256                let sender_id = identifiers.sender_id(&address)?;
257                Some((sender_id, SenderInfo { state_nonce: nonce, balance }))
258            })
259            .collect()
260    }
261
262    /// Get the config the pool was configured with.
263    pub const fn config(&self) -> &PoolConfig {
264        &self.config
265    }
266
267    /// Get the validator reference.
268    pub const fn validator(&self) -> &V {
269        &self.validator
270    }
271
272    /// Adds a new transaction listener to the pool that gets notified about every new _pending_
273    /// transaction inserted into the pool
274    pub fn add_pending_listener(&self, kind: TransactionListenerKind) -> mpsc::Receiver<TxHash> {
275        let (sender, rx) = mpsc::channel(self.config.pending_tx_listener_buffer_size);
276        let listener = PendingTransactionHashListener { sender, kind };
277
278        let mut listeners = self.pending_transaction_listener.write();
279        // Clean up dead listeners before adding new one
280        listeners.retain(|l| !l.sender.is_closed());
281        listeners.push(listener);
282
283        rx
284    }
285
286    /// Adds a new transaction listener to the pool that gets notified about every new transaction.
287    pub fn add_new_transaction_listener(
288        &self,
289        kind: TransactionListenerKind,
290    ) -> mpsc::Receiver<NewTransactionEvent<T::Transaction>> {
291        let (sender, rx) = mpsc::channel(self.config.new_tx_listener_buffer_size);
292        let listener = TransactionListener { sender, kind };
293
294        let mut listeners = self.transaction_listener.write();
295        // Clean up dead listeners before adding new one
296        listeners.retain(|l| !l.sender.is_closed());
297        listeners.push(listener);
298
299        rx
300    }
301    /// Adds a new blob sidecar listener to the pool that gets notified about every new
302    /// eip4844 transaction's blob sidecar.
303    pub fn add_blob_sidecar_listener(&self) -> mpsc::Receiver<NewBlobSidecar> {
304        let (sender, rx) = mpsc::channel(BLOB_SIDECAR_LISTENER_BUFFER_SIZE);
305        let listener = BlobTransactionSidecarListener { sender };
306        self.blob_transaction_sidecar_listener.lock().push(listener);
307        rx
308    }
309
310    /// If the pool contains the transaction, this adds a new listener that gets notified about
311    /// transaction events.
312    pub fn add_transaction_event_listener(&self, tx_hash: TxHash) -> Option<TransactionEvents> {
313        if !self.get_pool_data().contains(&tx_hash) {
314            return None
315        }
316        let mut listener = self.event_listener.write();
317        let events = listener.subscribe(tx_hash);
318        self.mark_event_listener_installed();
319        Some(events)
320    }
321
322    /// Adds a listener for all transaction events.
323    pub fn add_all_transactions_event_listener(&self) -> AllTransactionsEvents<T::Transaction> {
324        let mut listener = self.event_listener.write();
325        let events = listener.subscribe_all();
326        self.mark_event_listener_installed();
327        events
328    }
329
330    #[inline]
331    fn has_event_listeners(&self) -> bool {
332        self.has_event_listeners.load(Ordering::Relaxed)
333    }
334
335    #[inline]
336    fn mark_event_listener_installed(&self) {
337        self.has_event_listeners.store(true, Ordering::Relaxed);
338    }
339
340    #[inline]
341    fn update_event_listener_state(&self, listener: &PoolEventBroadcast<T::Transaction>) {
342        if listener.is_empty() {
343            self.has_event_listeners.store(false, Ordering::Relaxed);
344        }
345    }
346
347    #[inline]
348    fn with_event_listener<F>(&self, emit: F)
349    where
350        F: FnOnce(&mut PoolEventBroadcast<T::Transaction>),
351    {
352        if !self.has_event_listeners() {
353            return
354        }
355        let mut listener = self.event_listener.write();
356        if !listener.is_empty() {
357            emit(&mut listener);
358        }
359        self.update_event_listener_state(&listener);
360    }
361
362    /// Returns a read lock to the pool's data.
363    pub fn get_pool_data(&self) -> RwLockReadGuard<'_, TxPool<T>> {
364        self.pool.read()
365    }
366
367    /// Returns transactions in the pool that can be propagated
368    pub fn pooled_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
369        let mut out = Vec::new();
370        self.append_pooled_transactions(&mut out);
371        out
372    }
373
374    /// Returns hashes of transactions in the pool that can be propagated.
375    pub fn pooled_transactions_hashes(&self) -> Vec<TxHash> {
376        let mut out = Vec::new();
377        self.append_pooled_transactions_hashes(&mut out);
378        out
379    }
380
381    /// Returns only the first `max` transactions in the pool that can be propagated.
382    pub fn pooled_transactions_max(
383        &self,
384        max: usize,
385    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
386        let mut out = Vec::new();
387        self.append_pooled_transactions_max(max, &mut out);
388        out
389    }
390
391    /// Extends the given vector with all transactions in the pool that can be propagated.
392    pub fn append_pooled_transactions(
393        &self,
394        out: &mut Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
395    ) {
396        out.extend(
397            self.get_pool_data().all().transactions_iter().filter(|tx| tx.propagate).cloned(),
398        );
399    }
400
401    /// Extends the given vector with pooled transactions for the given hashes that are allowed to
402    /// be propagated.
403    pub fn append_pooled_transaction_elements(
404        &self,
405        tx_hashes: &[TxHash],
406        limit: GetPooledTransactionLimit,
407        out: &mut Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>,
408    ) where
409        <V as TransactionValidator>::Transaction: EthPoolTransaction,
410    {
411        let transactions = self.get_all_propagatable(tx_hashes);
412        let mut size = 0;
413        for transaction in transactions {
414            let encoded_len = transaction.encoded_length();
415            let Some(pooled) = self.to_pooled_transaction(transaction) else {
416                continue;
417            };
418
419            size += encoded_len;
420            out.push(pooled.into_inner());
421
422            if limit.exceeds(size) {
423                break
424            }
425        }
426    }
427
428    /// Extends the given vector with the hashes of all transactions in the pool that can be
429    /// propagated.
430    pub fn append_pooled_transactions_hashes(&self, out: &mut Vec<TxHash>) {
431        out.extend(
432            self.get_pool_data()
433                .all()
434                .transactions_iter()
435                .filter(|tx| tx.propagate)
436                .map(|tx| *tx.hash()),
437        );
438    }
439
440    /// Extends the given vector with only the first `max` transactions in the pool that can be
441    /// propagated.
442    pub fn append_pooled_transactions_max(
443        &self,
444        max: usize,
445        out: &mut Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
446    ) {
447        out.extend(
448            self.get_pool_data()
449                .all()
450                .transactions_iter()
451                .filter(|tx| tx.propagate)
452                .take(max)
453                .cloned(),
454        );
455    }
456
457    /// Returns only the first `max` hashes of transactions in the pool that can be propagated.
458    pub fn pooled_transactions_hashes_max(&self, max: usize) -> Vec<TxHash> {
459        if max == 0 {
460            return Vec::new();
461        }
462        self.get_pool_data()
463            .all()
464            .transactions_iter()
465            .filter(|tx| tx.propagate)
466            .take(max)
467            .map(|tx| *tx.hash())
468            .collect()
469    }
470
471    /// Converts the internally tracked transaction to the pooled format.
472    ///
473    /// If the transaction is an EIP-4844 transaction, the blob sidecar is fetched from the blob
474    /// store and attached to the transaction.
475    fn to_pooled_transaction(
476        &self,
477        transaction: Arc<ValidPoolTransaction<T::Transaction>>,
478    ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
479    where
480        <V as TransactionValidator>::Transaction: EthPoolTransaction,
481    {
482        if transaction.is_eip4844() {
483            let sidecar = self.blob_store.get(*transaction.hash()).ok()??;
484            transaction.transaction.clone().try_into_pooled_eip4844(sidecar)
485        } else {
486            transaction
487                .transaction
488                .clone_into_pooled()
489                .inspect_err(|err| {
490                    debug!(
491                        target: "txpool", %err,
492                        "failed to convert transaction to pooled element; skipping",
493                    );
494                })
495                .ok()
496        }
497    }
498
499    /// Returns pooled transactions for the given transaction hashes that are allowed to be
500    /// propagated.
501    pub fn get_pooled_transaction_elements(
502        &self,
503        tx_hashes: Vec<TxHash>,
504        limit: GetPooledTransactionLimit,
505    ) -> Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>
506    where
507        <V as TransactionValidator>::Transaction: EthPoolTransaction,
508    {
509        let mut elements = Vec::new();
510        self.append_pooled_transaction_elements(&tx_hashes, limit, &mut elements);
511        elements.shrink_to_fit();
512        elements
513    }
514
515    /// Returns converted pooled transaction for the given transaction hash.
516    pub fn get_pooled_transaction_element(
517        &self,
518        tx_hash: TxHash,
519    ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
520    where
521        <V as TransactionValidator>::Transaction: EthPoolTransaction,
522    {
523        self.get(&tx_hash).and_then(|tx| self.to_pooled_transaction(tx))
524    }
525
526    /// Updates the entire pool after a new block was executed.
527    pub fn on_canonical_state_change(&self, update: CanonicalStateUpdate<'_, V::Block>) {
528        trace!(target: "txpool", ?update, "updating pool on canonical state change");
529
530        let block_info = update.block_info();
531        let CanonicalStateUpdate {
532            new_tip, changed_accounts, mined_transactions, update_kind, ..
533        } = update;
534        self.validator.on_new_head_block(new_tip);
535
536        let changed_senders = self.changed_senders(changed_accounts.into_iter());
537
538        // update the pool
539        let outcome = self.pool.write().on_canonical_state_change(
540            block_info,
541            mined_transactions,
542            changed_senders,
543            update_kind,
544        );
545
546        // This will discard outdated transactions based on the account's nonce
547        self.delete_discarded_blobs(outcome.discarded.iter());
548
549        // notify listeners about updates
550        self.notify_on_new_state(outcome);
551    }
552
553    /// Performs account updates on the pool.
554    ///
555    /// This will either promote or discard transactions based on the new account state.
556    ///
557    /// This should be invoked when the pool drifted and accounts are updated manually
558    pub fn update_accounts(&self, accounts: Vec<ChangedAccount>) {
559        let changed_senders = self.changed_senders(accounts.into_iter());
560        let UpdateOutcome { promoted, discarded } =
561            self.pool.write().update_accounts(changed_senders);
562
563        self.notify_on_transaction_updates(promoted, discarded);
564    }
565
566    /// Add a single validated transaction into the pool.
567    ///
568    /// Returns the outcome and optionally metadata to be processed after the pool lock is
569    /// released.
570    ///
571    /// Note: this is only used internally by [`Self::add_transactions()`], all new transaction(s)
572    /// come in through that function, either as a batch or `std::iter::once`.
573    fn add_transaction(
574        &self,
575        pool: &mut RwLockWriteGuard<'_, TxPool<T>>,
576        origin: TransactionOrigin,
577        tx: TransactionValidationOutcome<T::Transaction>,
578    ) -> (PoolResult<AddedTransactionOutcome>, Option<AddedTransactionMeta<T::Transaction>>) {
579        match tx {
580            TransactionValidationOutcome::Valid {
581                balance,
582                state_nonce,
583                transaction,
584                propagate,
585                bytecode_hash,
586                authorities,
587            } => {
588                let sender_id = self.get_sender_id(transaction.sender());
589                let transaction_id = TransactionId::new(sender_id, transaction.nonce());
590
591                // split the valid transaction and the blob sidecar if it has any
592                let (transaction, blob_sidecar) = match transaction {
593                    ValidTransaction::Valid(tx) => (tx, None),
594                    ValidTransaction::ValidWithSidecar { transaction, sidecar } => {
595                        debug_assert!(
596                            transaction.is_eip4844(),
597                            "validator returned sidecar for non EIP-4844 transaction"
598                        );
599                        (transaction, Some(sidecar))
600                    }
601                };
602
603                let tx = ValidPoolTransaction {
604                    transaction,
605                    transaction_id,
606                    propagate,
607                    timestamp: Instant::now(),
608                    origin,
609                    authority_ids: authorities.map(|auths| self.get_sender_ids(auths)),
610                };
611
612                let added = match pool.add_transaction(tx, balance, state_nonce, bytecode_hash) {
613                    Ok(added) => added,
614                    Err(err) => return (Err(err), None),
615                };
616                let hash = *added.hash();
617                let state = added.transaction_state();
618
619                let meta = AddedTransactionMeta { added, blob_sidecar };
620
621                (Ok(AddedTransactionOutcome { hash, state }), Some(meta))
622            }
623            TransactionValidationOutcome::Invalid(tx, err) => {
624                self.with_event_listener(|listener| listener.invalid(tx.hash()));
625                (Err(PoolError::new(*tx.hash(), err)), None)
626            }
627            TransactionValidationOutcome::Error(tx_hash, err) => {
628                self.with_event_listener(|listener| listener.discarded(&tx_hash));
629                (Err(PoolError::other(tx_hash, err)), None)
630            }
631        }
632    }
633
634    /// Adds a transaction and returns the event stream.
635    pub fn add_transaction_and_subscribe(
636        &self,
637        origin: TransactionOrigin,
638        tx: TransactionValidationOutcome<T::Transaction>,
639    ) -> PoolResult<TransactionEvents> {
640        let listener = {
641            let mut listener = self.event_listener.write();
642            let events = listener.subscribe(tx.tx_hash());
643            self.mark_event_listener_installed();
644            events
645        };
646        let mut results = self.add_transactions(origin, std::iter::once(tx));
647        results.pop().expect("result length is the same as the input")?;
648        Ok(listener)
649    }
650
651    /// Adds all transactions in the iterator to the pool, returning a list of results.
652    ///
653    /// Convenience method that assigns the same origin to all transactions. Delegates to
654    /// [`Self::add_transactions_with_origins`].
655    pub fn add_transactions(
656        &self,
657        origin: TransactionOrigin,
658        transactions: impl IntoIterator<Item = TransactionValidationOutcome<T::Transaction>>,
659    ) -> Vec<PoolResult<AddedTransactionOutcome>> {
660        self.add_transactions_with_origins(transactions.into_iter().map(|tx| (origin, tx)))
661    }
662
663    /// Adds all transactions in the iterator to the pool, each with its own
664    /// [`TransactionOrigin`], returning a list of results.
665    pub fn add_transactions_with_origins(
666        &self,
667        transactions: impl IntoIterator<
668            Item = (TransactionOrigin, TransactionValidationOutcome<T::Transaction>),
669        >,
670    ) -> Vec<PoolResult<AddedTransactionOutcome>> {
671        // Collect results and metadata while holding the pool write lock
672        let (mut results, added_metas, discarded) = {
673            let mut pool = self.pool.write();
674            let mut added_metas = Vec::new();
675
676            let results = transactions
677                .into_iter()
678                .map(|(origin, tx)| {
679                    let (result, meta) = self.add_transaction(&mut pool, origin, tx);
680
681                    // Only collect metadata for successful insertions
682                    if result.is_ok() &&
683                        let Some(meta) = meta
684                    {
685                        added_metas.push(meta);
686                    }
687
688                    result
689                })
690                .collect::<Vec<_>>();
691
692            // Enforce the pool size limits if at least one transaction was added successfully
693            let discarded = if results.iter().any(Result::is_ok) {
694                pool.discard_worst()
695            } else {
696                Default::default()
697            };
698
699            (results, added_metas, discarded)
700        };
701
702        for meta in added_metas {
703            self.on_added_transaction(meta);
704        }
705
706        if !discarded.is_empty() {
707            // Delete any blobs associated with discarded blob transactions
708            self.delete_discarded_blobs(discarded.iter());
709            self.with_event_listener(|listener| listener.discarded_many(&discarded));
710
711            let discarded_hashes =
712                discarded.into_iter().map(|tx| *tx.hash()).collect::<HashSet<_>>();
713
714            // A newly added transaction may be immediately discarded, so we need to
715            // adjust the result here
716            for res in &mut results {
717                if let Ok(AddedTransactionOutcome { hash, .. }) = res &&
718                    discarded_hashes.contains(hash)
719                {
720                    *res = Err(PoolError::new(*hash, PoolErrorKind::DiscardedOnInsert))
721                }
722            }
723        };
724
725        results
726    }
727
728    /// Process a transaction that was added to the pool.
729    ///
730    /// Performs blob storage operations and sends all notifications. This should be called
731    /// after the pool write lock has been released to avoid blocking pool operations.
732    fn on_added_transaction(&self, meta: AddedTransactionMeta<T::Transaction>) {
733        // Handle blob sidecar storage and notifications for EIP-4844 transactions
734        if let Some(sidecar) = meta.blob_sidecar {
735            let hash = *meta.added.hash();
736            self.on_new_blob_sidecar(&hash, &sidecar);
737            self.insert_blob(hash, sidecar);
738        }
739
740        // Delete replaced blob sidecar if any
741        if let Some(replaced) = meta.added.replaced_blob_transaction() {
742            debug!(target: "txpool", "[{:?}] delete replaced blob sidecar", replaced);
743            self.delete_blob(replaced);
744        }
745
746        // Delete discarded blob sidecars if any, this doesnt do any IO.
747        if let Some(discarded) = meta.added.discarded_transactions() {
748            self.delete_discarded_blobs(discarded.iter());
749        }
750
751        // Notify pending transaction listeners
752        if let Some(pending) = meta.added.as_pending() {
753            self.on_new_pending_transaction(pending);
754        }
755
756        // Notify event listeners
757        self.notify_event_listeners(&meta.added);
758
759        // Notify new transaction listeners
760        self.on_new_transaction(meta.added.into_new_transaction_event());
761    }
762
763    /// Notify all listeners about a new pending transaction.
764    ///
765    /// See also [`Self::add_pending_listener`]
766    ///
767    /// CAUTION: This function is only intended to be used manually in order to use this type's
768    /// pending transaction receivers when manually implementing the
769    /// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation
770    /// [`TransactionPool::pending_transactions_listener_for`](crate::TransactionPool).
771    pub fn on_new_pending_transaction(&self, pending: &AddedPendingTransaction<T::Transaction>) {
772        let mut needs_cleanup = false;
773
774        {
775            let listeners = self.pending_transaction_listener.read();
776            for listener in listeners.iter() {
777                if !listener.send_all(pending.pending_transactions(listener.kind)) {
778                    needs_cleanup = true;
779                }
780            }
781        }
782
783        // Clean up dead listeners if we detected any closed channels
784        if needs_cleanup {
785            self.pending_transaction_listener
786                .write()
787                .retain(|listener| !listener.sender.is_closed());
788        }
789    }
790
791    /// Notify all listeners about a newly inserted pending transaction.
792    ///
793    /// See also [`Self::add_new_transaction_listener`]
794    ///
795    /// CAUTION: This function is only intended to be used manually in order to use this type's
796    /// transaction receivers when manually implementing the
797    /// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation
798    /// [`TransactionPool::new_transactions_listener_for`](crate::TransactionPool).
799    pub fn on_new_transaction(&self, event: NewTransactionEvent<T::Transaction>) {
800        let mut needs_cleanup = false;
801
802        {
803            let listeners = self.transaction_listener.read();
804            for listener in listeners.iter() {
805                if listener.kind.is_propagate_only() && !event.transaction.propagate {
806                    if listener.sender.is_closed() {
807                        needs_cleanup = true;
808                    }
809                    // Skip non-propagate transactions for propagate-only listeners
810                    continue
811                }
812
813                if !listener.send(event.clone()) {
814                    needs_cleanup = true;
815                }
816            }
817        }
818
819        // Clean up dead listeners if we detected any closed channels
820        if needs_cleanup {
821            self.transaction_listener.write().retain(|listener| !listener.sender.is_closed());
822        }
823    }
824
825    /// Notify all listeners about a blob sidecar for a newly inserted blob (eip4844) transaction.
826    fn on_new_blob_sidecar(&self, tx_hash: &TxHash, sidecar: &BlobTransactionSidecarVariant) {
827        let mut sidecar_listeners = self.blob_transaction_sidecar_listener.lock();
828        if sidecar_listeners.is_empty() {
829            return
830        }
831        let sidecar = Arc::new(sidecar.clone());
832        sidecar_listeners.retain_mut(|listener| {
833            let new_blob_event = NewBlobSidecar { tx_hash: *tx_hash, sidecar: sidecar.clone() };
834            match listener.sender.try_send(new_blob_event) {
835                Ok(()) => true,
836                Err(err) => {
837                    if matches!(err, mpsc::error::TrySendError::Full(_)) {
838                        debug!(
839                            target: "txpool",
840                            "[{:?}] failed to send blob sidecar; channel full",
841                            sidecar,
842                        );
843                        true
844                    } else {
845                        false
846                    }
847                }
848            }
849        })
850    }
851
852    /// Notifies transaction listeners about changes once a block was processed.
853    fn notify_on_new_state(&self, outcome: OnNewCanonicalStateOutcome<T::Transaction>) {
854        trace!(target: "txpool", promoted=outcome.promoted.len(), discarded= outcome.discarded.len() ,"notifying listeners on state change");
855
856        // notify about promoted pending transactions - emit hashes
857        let mut needs_pending_cleanup = false;
858        {
859            let listeners = self.pending_transaction_listener.read();
860            for listener in listeners.iter() {
861                if !listener.send_all(outcome.pending_transactions(listener.kind)) {
862                    needs_pending_cleanup = true;
863                }
864            }
865        }
866        if needs_pending_cleanup {
867            self.pending_transaction_listener.write().retain(|l| !l.sender.is_closed());
868        }
869
870        // emit full transactions
871        let mut needs_tx_cleanup = false;
872        {
873            let listeners = self.transaction_listener.read();
874            for listener in listeners.iter() {
875                if !listener.send_all(outcome.full_pending_transactions(listener.kind)) {
876                    needs_tx_cleanup = true;
877                }
878            }
879        }
880        if needs_tx_cleanup {
881            self.transaction_listener.write().retain(|l| !l.sender.is_closed());
882        }
883
884        let OnNewCanonicalStateOutcome { mined, promoted, discarded, block_hash } = outcome;
885
886        // broadcast specific transaction events
887        self.with_event_listener(|listener| {
888            for tx in &mined {
889                listener.mined(tx, block_hash);
890            }
891            for tx in &promoted {
892                listener.pending(tx.hash(), None);
893            }
894            for tx in &discarded {
895                listener.discarded(tx.hash());
896            }
897        })
898    }
899
900    /// Notifies all listeners about the transaction movements.
901    ///
902    /// This will emit events according to the provided changes.
903    ///
904    /// CAUTION: This function is only intended to be used manually in order to use this type's
905    /// [`TransactionEvents`] receivers when manually implementing the
906    /// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation
907    /// [`TransactionPool::transaction_event_listener`](crate::TransactionPool).
908    #[allow(clippy::type_complexity)]
909    pub fn notify_on_transaction_updates(
910        &self,
911        promoted: Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
912        discarded: Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
913    ) {
914        // Notify about promoted pending transactions (similar to notify_on_new_state)
915        if !promoted.is_empty() {
916            let mut needs_pending_cleanup = false;
917            {
918                let listeners = self.pending_transaction_listener.read();
919                for listener in listeners.iter() {
920                    let promoted_hashes = promoted.iter().filter_map(|tx| {
921                        if listener.kind.is_propagate_only() && !tx.propagate {
922                            None
923                        } else {
924                            Some(*tx.hash())
925                        }
926                    });
927                    if !listener.send_all(promoted_hashes) {
928                        needs_pending_cleanup = true;
929                    }
930                }
931            }
932            if needs_pending_cleanup {
933                self.pending_transaction_listener.write().retain(|l| !l.sender.is_closed());
934            }
935
936            // in this case we should also emit promoted transactions in full
937            let mut needs_tx_cleanup = false;
938            {
939                let listeners = self.transaction_listener.read();
940                for listener in listeners.iter() {
941                    let promoted_txs = promoted.iter().filter_map(|tx| {
942                        if listener.kind.is_propagate_only() && !tx.propagate {
943                            None
944                        } else {
945                            Some(NewTransactionEvent::pending(tx.clone()))
946                        }
947                    });
948                    if !listener.send_all(promoted_txs) {
949                        needs_tx_cleanup = true;
950                    }
951                }
952            }
953            if needs_tx_cleanup {
954                self.transaction_listener.write().retain(|l| !l.sender.is_closed());
955            }
956        }
957
958        self.with_event_listener(|listener| {
959            for tx in &promoted {
960                listener.pending(tx.hash(), None);
961            }
962            for tx in &discarded {
963                listener.discarded(tx.hash());
964            }
965        });
966
967        if !discarded.is_empty() {
968            // This deletes outdated blob txs from the blob store, based on the account's nonce.
969            // This is called during txpool maintenance when the pool drifted.
970            self.delete_discarded_blobs(discarded.iter());
971        }
972    }
973
974    /// Fire events for the newly added transaction if there are any.
975    ///
976    /// See also [`Self::add_transaction_event_listener`].
977    ///
978    /// CAUTION: This function is only intended to be used manually in order to use this type's
979    /// [`TransactionEvents`] receivers when manually implementing the
980    /// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation
981    /// [`TransactionPool::transaction_event_listener`](crate::TransactionPool).
982    pub fn notify_event_listeners(&self, tx: &AddedTransaction<T::Transaction>) {
983        self.with_event_listener(|listener| match tx {
984            AddedTransaction::Pending(tx) => {
985                let AddedPendingTransaction { transaction, promoted, discarded, replaced } = tx;
986
987                listener.pending(transaction.hash(), replaced.clone());
988                for tx in promoted {
989                    listener.pending(tx.hash(), None);
990                }
991                for tx in discarded {
992                    listener.discarded(tx.hash());
993                }
994            }
995            AddedTransaction::Parked { transaction, replaced, queued_reason, .. } => {
996                listener.queued(transaction.hash(), queued_reason.clone());
997                if let Some(replaced) = replaced {
998                    listener.replaced(replaced.clone(), *transaction.hash());
999                }
1000            }
1001        });
1002    }
1003
1004    /// Returns an iterator that yields transactions that are ready to be included in the block.
1005    pub fn best_transactions(&self) -> BestTransactions<T> {
1006        self.get_pool_data().best_transactions()
1007    }
1008
1009    /// Returns an iterator that yields transactions that are ready to be included in the block with
1010    /// the given base fee and optional blob fee attributes.
1011    pub fn best_transactions_with_attributes(
1012        &self,
1013        best_transactions_attributes: BestTransactionsAttributes,
1014    ) -> Box<dyn crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T::Transaction>>>>
1015    {
1016        self.get_pool_data().best_transactions_with_attributes(best_transactions_attributes)
1017    }
1018
1019    /// Returns only the first `max` transactions in the pending pool.
1020    pub fn pending_transactions_max(
1021        &self,
1022        max: usize,
1023    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1024        self.get_pool_data().pending_transactions_iter().take(max).collect()
1025    }
1026
1027    /// Returns all transactions from the pending sub-pool
1028    pub fn pending_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1029        self.get_pool_data().pending_transactions()
1030    }
1031
1032    /// Returns all transactions from parked pools
1033    pub fn queued_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1034        self.get_pool_data().queued_transactions()
1035    }
1036
1037    /// Returns all transactions in the pool
1038    pub fn all_transactions(&self) -> AllPoolTransactions<T::Transaction> {
1039        let pool = self.get_pool_data();
1040        AllPoolTransactions {
1041            pending: pool.pending_transactions(),
1042            queued: pool.queued_transactions(),
1043        }
1044    }
1045
1046    /// Returns _all_ transactions in the pool
1047    pub fn all_transaction_hashes(&self) -> Vec<TxHash> {
1048        self.get_pool_data().all().transactions_iter().map(|tx| *tx.hash()).collect()
1049    }
1050
1051    /// Removes and returns all matching transactions from the pool.
1052    ///
1053    /// This behaves as if the transactions got discarded (_not_ mined), effectively introducing a
1054    /// nonce gap for the given transactions.
1055    pub fn remove_transactions(
1056        &self,
1057        hashes: Vec<TxHash>,
1058    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1059        if hashes.is_empty() {
1060            return Vec::new()
1061        }
1062        let removed = self.pool.write().remove_transactions(hashes);
1063
1064        self.with_event_listener(|listener| listener.discarded_many(&removed));
1065
1066        removed
1067    }
1068
1069    /// Removes and returns all matching transactions and their dependent transactions from the
1070    /// pool.
1071    pub fn remove_transactions_and_descendants(
1072        &self,
1073        hashes: Vec<TxHash>,
1074    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1075        if hashes.is_empty() {
1076            return Vec::new()
1077        }
1078        let removed = self.pool.write().remove_transactions_and_descendants(hashes);
1079
1080        self.with_event_listener(|listener| {
1081            for tx in &removed {
1082                listener.discarded(tx.hash());
1083            }
1084        });
1085
1086        removed
1087    }
1088
1089    /// Removes and returns all transactions by the specified sender from the pool.
1090    pub fn remove_transactions_by_sender(
1091        &self,
1092        sender: Address,
1093    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1094        let Some(sender_id) = self.sender_id(&sender) else { return Vec::new() };
1095        let removed = self.pool.write().remove_transactions_by_sender(sender_id);
1096
1097        self.with_event_listener(|listener| listener.discarded_many(&removed));
1098
1099        removed
1100    }
1101
1102    /// Prunes and returns all matching transactions from the pool.
1103    ///
1104    /// This removes the transactions as if they were mined: descendant transactions are **not**
1105    /// parked and remain eligible for inclusion.
1106    pub fn prune_transactions(
1107        &self,
1108        hashes: Vec<TxHash>,
1109    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1110        if hashes.is_empty() {
1111            return Vec::new()
1112        }
1113
1114        self.pool.write().prune_transactions(hashes)
1115    }
1116
1117    /// Removes and returns all transactions that are present in the pool.
1118    pub fn retain_unknown<A>(&self, announcement: &mut A)
1119    where
1120        A: HandleMempoolData,
1121    {
1122        if announcement.is_empty() {
1123            return
1124        }
1125        let pool = self.get_pool_data();
1126        announcement.retain_by_hash(|tx| !pool.contains(tx))
1127    }
1128
1129    /// Returns the transaction by hash.
1130    pub fn get(&self, tx_hash: &TxHash) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1131        self.get_pool_data().get(tx_hash)
1132    }
1133
1134    /// Returns all transactions of the address
1135    pub fn get_transactions_by_sender(
1136        &self,
1137        sender: Address,
1138    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1139        let Some(sender_id) = self.sender_id(&sender) else { return Vec::new() };
1140        self.get_pool_data().get_transactions_by_sender(sender_id)
1141    }
1142
1143    /// Returns a pending transaction sent by the given sender with the given nonce.
1144    pub fn get_pending_transaction_by_sender_and_nonce(
1145        &self,
1146        sender: Address,
1147        nonce: u64,
1148    ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1149        let sender_id = self.sender_id(&sender)?;
1150        self.get_pool_data().get_pending_transaction_by_sender_and_nonce(sender_id, nonce)
1151    }
1152
1153    /// Returns all queued transactions of the address by sender
1154    pub fn get_queued_transactions_by_sender(
1155        &self,
1156        sender: Address,
1157    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1158        let Some(sender_id) = self.sender_id(&sender) else { return Vec::new() };
1159        self.get_pool_data().queued_txs_by_sender(sender_id)
1160    }
1161
1162    /// Returns all pending transactions filtered by predicate
1163    pub fn pending_transactions_with_predicate(
1164        &self,
1165        predicate: impl FnMut(&ValidPoolTransaction<T::Transaction>) -> bool,
1166    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1167        self.get_pool_data().pending_transactions_with_predicate(predicate)
1168    }
1169
1170    /// Returns all pending transactions of the address by sender
1171    pub fn get_pending_transactions_by_sender(
1172        &self,
1173        sender: Address,
1174    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1175        let Some(sender_id) = self.sender_id(&sender) else { return Vec::new() };
1176        self.get_pool_data().pending_txs_by_sender(sender_id)
1177    }
1178
1179    /// Returns the highest transaction of the address
1180    pub fn get_highest_transaction_by_sender(
1181        &self,
1182        sender: Address,
1183    ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1184        let sender_id = self.sender_id(&sender)?;
1185        self.get_pool_data().get_highest_transaction_by_sender(sender_id)
1186    }
1187
1188    /// Returns the transaction with the highest nonce that is executable given the on chain nonce.
1189    pub fn get_highest_consecutive_transaction_by_sender(
1190        &self,
1191        sender: Address,
1192        on_chain_nonce: u64,
1193    ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1194        let sender_id = self.sender_id(&sender)?;
1195        self.get_pool_data().get_highest_consecutive_transaction_by_sender(
1196            sender_id.into_transaction_id(on_chain_nonce),
1197        )
1198    }
1199
1200    /// Returns the transaction given a [`TransactionId`]
1201    pub fn get_transaction_by_transaction_id(
1202        &self,
1203        transaction_id: &TransactionId,
1204    ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1205        self.get_pool_data().all().get(transaction_id).map(|tx| tx.transaction.clone())
1206    }
1207
1208    /// Returns all transactions that where submitted with the given [`TransactionOrigin`]
1209    pub fn get_transactions_by_origin(
1210        &self,
1211        origin: TransactionOrigin,
1212    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1213        self.get_pool_data()
1214            .all()
1215            .transactions_iter()
1216            .filter(|tx| tx.origin == origin)
1217            .cloned()
1218            .collect()
1219    }
1220
1221    /// Returns all pending transactions filtered by [`TransactionOrigin`]
1222    pub fn get_pending_transactions_by_origin(
1223        &self,
1224        origin: TransactionOrigin,
1225    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1226        self.get_pool_data().pending_transactions_iter().filter(|tx| tx.origin == origin).collect()
1227    }
1228
1229    /// Returns all the transactions belonging to the hashes.
1230    ///
1231    /// If no transaction exists, it is skipped.
1232    pub fn get_all(&self, txs: Vec<TxHash>) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1233        if txs.is_empty() {
1234            return Vec::new()
1235        }
1236        self.get_pool_data().get_all(txs).collect()
1237    }
1238
1239    /// Returns all the transactions belonging to the hashes that are propagatable.
1240    ///
1241    /// If no transaction exists, it is skipped.
1242    fn get_all_propagatable(
1243        &self,
1244        txs: &[TxHash],
1245    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1246        if txs.is_empty() {
1247            return Vec::new()
1248        }
1249        let pool = self.get_pool_data();
1250        txs.iter().filter_map(|tx| pool.get(tx).filter(|tx| tx.propagate)).collect()
1251    }
1252
1253    /// Notify about propagated transactions.
1254    pub fn on_propagated(&self, txs: PropagatedTransactions) {
1255        if txs.is_empty() {
1256            return
1257        }
1258        self.with_event_listener(|listener| {
1259            txs.into_iter().for_each(|(hash, peers)| listener.propagated(&hash, peers));
1260        });
1261    }
1262
1263    /// Number of transactions in the entire pool
1264    pub fn len(&self) -> usize {
1265        self.get_pool_data().len()
1266    }
1267
1268    /// Whether the pool is empty
1269    pub fn is_empty(&self) -> bool {
1270        self.get_pool_data().is_empty()
1271    }
1272
1273    /// Returns whether or not the pool is over its configured size and transaction count limits.
1274    pub fn is_exceeded(&self) -> bool {
1275        self.pool.read().is_exceeded()
1276    }
1277
1278    /// Inserts a blob transaction into the blob store
1279    fn insert_blob(&self, hash: TxHash, blob: BlobTransactionSidecarVariant) {
1280        debug!(target: "txpool", "[{:?}] storing blob sidecar", hash);
1281        if let Err(err) = self.blob_store.insert(hash, blob) {
1282            warn!(target: "txpool", %err, "[{:?}] failed to insert blob", hash);
1283            self.blob_store_metrics.blobstore_failed_inserts.increment(1);
1284        }
1285        self.update_blob_store_metrics();
1286    }
1287
1288    /// Delete a blob from the blob store
1289    pub fn delete_blob(&self, blob: TxHash) {
1290        let _ = self.blob_store.delete(blob);
1291    }
1292
1293    /// Delete all blobs from the blob store
1294    pub fn delete_blobs(&self, txs: Vec<TxHash>) {
1295        let _ = self.blob_store.delete_all(txs);
1296    }
1297
1298    /// Cleans up the blob store
1299    pub fn cleanup_blobs(&self) {
1300        let stat = self.blob_store.cleanup();
1301        self.blob_store_metrics.blobstore_failed_deletes.increment(stat.delete_failed as u64);
1302        self.update_blob_store_metrics();
1303    }
1304
1305    fn update_blob_store_metrics(&self) {
1306        if let Some(data_size) = self.blob_store.data_size_hint() {
1307            self.blob_store_metrics.blobstore_byte_size.set(data_size as f64);
1308        }
1309        self.blob_store_metrics.blobstore_entries.set(self.blob_store.blobs_len() as f64);
1310    }
1311
1312    /// Deletes all blob transactions that were discarded.
1313    fn delete_discarded_blobs<'a>(
1314        &'a self,
1315        transactions: impl IntoIterator<Item = &'a Arc<ValidPoolTransaction<T::Transaction>>>,
1316    ) {
1317        let blob_txs = transactions
1318            .into_iter()
1319            .filter(|tx| tx.transaction.is_eip4844())
1320            .map(|tx| *tx.hash())
1321            .collect();
1322        self.delete_blobs(blob_txs);
1323    }
1324}
1325
1326impl<V, T: TransactionOrdering, S> fmt::Debug for PoolInner<V, T, S> {
1327    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1328        f.debug_struct("PoolInner").field("config", &self.config).finish_non_exhaustive()
1329    }
1330}
1331
1332/// Metadata for a transaction that was added to the pool.
1333///
1334/// This holds all the data needed to complete post-insertion operations (notifications,
1335/// blob storage).
1336#[derive(Debug)]
1337struct AddedTransactionMeta<T: PoolTransaction> {
1338    /// The transaction that was added to the pool
1339    added: AddedTransaction<T>,
1340    /// Optional blob sidecar for EIP-4844 transactions
1341    blob_sidecar: Option<BlobTransactionSidecarVariant>,
1342}
1343
1344/// Tracks an added transaction and all graph changes caused by adding it.
1345#[derive(Debug, Clone)]
1346pub struct AddedPendingTransaction<T: PoolTransaction> {
1347    /// Inserted transaction.
1348    pub transaction: Arc<ValidPoolTransaction<T>>,
1349    /// Replaced transaction.
1350    pub replaced: Option<Arc<ValidPoolTransaction<T>>>,
1351    /// transactions promoted to the pending queue
1352    pub promoted: Vec<Arc<ValidPoolTransaction<T>>>,
1353    /// transactions that failed and became discarded
1354    pub discarded: Vec<Arc<ValidPoolTransaction<T>>>,
1355}
1356
1357impl<T: PoolTransaction> AddedPendingTransaction<T> {
1358    /// Returns all transactions that were promoted to the pending pool and adhere to the given
1359    /// [`TransactionListenerKind`].
1360    ///
1361    /// If the kind is [`TransactionListenerKind::PropagateOnly`], then only transactions that
1362    /// are allowed to be propagated are returned.
1363    pub(crate) fn pending_transactions(
1364        &self,
1365        kind: TransactionListenerKind,
1366    ) -> impl Iterator<Item = B256> + '_ {
1367        let iter = std::iter::once(&self.transaction).chain(self.promoted.iter());
1368        PendingTransactionIter { kind, iter }
1369    }
1370}
1371
1372pub(crate) struct PendingTransactionIter<Iter> {
1373    kind: TransactionListenerKind,
1374    iter: Iter,
1375}
1376
1377impl<'a, Iter, T> Iterator for PendingTransactionIter<Iter>
1378where
1379    Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1380    T: PoolTransaction + 'a,
1381{
1382    type Item = B256;
1383
1384    fn next(&mut self) -> Option<Self::Item> {
1385        loop {
1386            let next = self.iter.next()?;
1387            if self.kind.is_propagate_only() && !next.propagate {
1388                continue
1389            }
1390            return Some(*next.hash())
1391        }
1392    }
1393}
1394
1395/// An iterator over full pending transactions
1396pub(crate) struct FullPendingTransactionIter<Iter> {
1397    kind: TransactionListenerKind,
1398    iter: Iter,
1399}
1400
1401impl<'a, Iter, T> Iterator for FullPendingTransactionIter<Iter>
1402where
1403    Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1404    T: PoolTransaction + 'a,
1405{
1406    type Item = NewTransactionEvent<T>;
1407
1408    fn next(&mut self) -> Option<Self::Item> {
1409        loop {
1410            let next = self.iter.next()?;
1411            if self.kind.is_propagate_only() && !next.propagate {
1412                continue
1413            }
1414            return Some(NewTransactionEvent {
1415                subpool: SubPool::Pending,
1416                transaction: next.clone(),
1417            })
1418        }
1419    }
1420}
1421
1422/// Represents a transaction that was added into the pool and its state
1423#[derive(Debug, Clone)]
1424pub enum AddedTransaction<T: PoolTransaction> {
1425    /// Transaction was successfully added and moved to the pending pool.
1426    Pending(AddedPendingTransaction<T>),
1427    /// Transaction was successfully added but not yet ready for processing and moved to a
1428    /// parked pool instead.
1429    Parked {
1430        /// Inserted transaction.
1431        transaction: Arc<ValidPoolTransaction<T>>,
1432        /// Replaced transaction.
1433        replaced: Option<Arc<ValidPoolTransaction<T>>>,
1434        /// The subpool it was moved to.
1435        subpool: SubPool,
1436        /// The specific reason why the transaction is queued (if applicable).
1437        queued_reason: Option<QueuedReason>,
1438    },
1439}
1440
1441impl<T: PoolTransaction> AddedTransaction<T> {
1442    /// Returns whether the transaction has been added to the pending pool.
1443    pub const fn as_pending(&self) -> Option<&AddedPendingTransaction<T>> {
1444        match self {
1445            Self::Pending(tx) => Some(tx),
1446            _ => None,
1447        }
1448    }
1449
1450    /// Returns the replaced transaction if there was one
1451    pub const fn replaced(&self) -> Option<&Arc<ValidPoolTransaction<T>>> {
1452        match self {
1453            Self::Pending(tx) => tx.replaced.as_ref(),
1454            Self::Parked { replaced, .. } => replaced.as_ref(),
1455        }
1456    }
1457
1458    /// Returns the discarded transactions if there were any
1459    pub(crate) fn discarded_transactions(&self) -> Option<&[Arc<ValidPoolTransaction<T>>]> {
1460        match self {
1461            Self::Pending(tx) => Some(&tx.discarded),
1462            Self::Parked { .. } => None,
1463        }
1464    }
1465
1466    /// Returns the hash of the replaced transaction if it is a blob transaction.
1467    pub(crate) fn replaced_blob_transaction(&self) -> Option<B256> {
1468        self.replaced().filter(|tx| tx.transaction.is_eip4844()).map(|tx| *tx.transaction.hash())
1469    }
1470
1471    /// Returns the hash of the transaction
1472    pub fn hash(&self) -> &TxHash {
1473        match self {
1474            Self::Pending(tx) => tx.transaction.hash(),
1475            Self::Parked { transaction, .. } => transaction.hash(),
1476        }
1477    }
1478
1479    /// Converts this type into the event type for listeners
1480    pub fn into_new_transaction_event(self) -> NewTransactionEvent<T> {
1481        match self {
1482            Self::Pending(tx) => {
1483                NewTransactionEvent { subpool: SubPool::Pending, transaction: tx.transaction }
1484            }
1485            Self::Parked { transaction, subpool, .. } => {
1486                NewTransactionEvent { transaction, subpool }
1487            }
1488        }
1489    }
1490
1491    /// Returns the subpool this transaction was added to
1492    pub(crate) const fn subpool(&self) -> SubPool {
1493        match self {
1494            Self::Pending(_) => SubPool::Pending,
1495            Self::Parked { subpool, .. } => *subpool,
1496        }
1497    }
1498
1499    /// Returns the [`TransactionId`] of the added transaction
1500    #[cfg(test)]
1501    pub(crate) fn id(&self) -> &TransactionId {
1502        match self {
1503            Self::Pending(added) => added.transaction.id(),
1504            Self::Parked { transaction, .. } => transaction.id(),
1505        }
1506    }
1507
1508    /// Returns the queued reason if the transaction is parked with a queued reason.
1509    pub const fn queued_reason(&self) -> Option<&QueuedReason> {
1510        match self {
1511            Self::Pending(_) => None,
1512            Self::Parked { queued_reason, .. } => queued_reason.as_ref(),
1513        }
1514    }
1515
1516    /// Returns the transaction state based on the subpool and queued reason.
1517    pub fn transaction_state(&self) -> AddedTransactionState {
1518        match self.subpool() {
1519            SubPool::Pending => AddedTransactionState::Pending,
1520            _ => {
1521                // For non-pending transactions, use the queued reason directly from the
1522                // AddedTransaction
1523                if let Some(reason) = self.queued_reason() {
1524                    AddedTransactionState::Queued(reason.clone())
1525                } else {
1526                    // Fallback - this shouldn't happen with the new implementation
1527                    AddedTransactionState::Queued(QueuedReason::NonceGap)
1528                }
1529            }
1530        }
1531    }
1532}
1533
1534/// The specific reason why a transaction is queued (not ready for execution)
1535#[derive(Debug, Clone, PartialEq, Eq)]
1536pub enum QueuedReason {
1537    /// Transaction has a nonce gap - missing prior transactions
1538    NonceGap,
1539    /// Transaction has parked ancestors - waiting for other transactions to be mined
1540    ParkedAncestors,
1541    /// Sender has insufficient balance to cover the transaction cost
1542    InsufficientBalance,
1543    /// Transaction exceeds the block gas limit
1544    TooMuchGas,
1545    /// Transaction doesn't meet the base fee requirement
1546    InsufficientBaseFee,
1547    /// Transaction doesn't meet the blob fee requirement (EIP-4844)
1548    InsufficientBlobFee,
1549}
1550
1551/// The state of a transaction when is was added to the pool
1552#[derive(Debug, Clone, PartialEq, Eq)]
1553pub enum AddedTransactionState {
1554    /// Ready for execution
1555    Pending,
1556    /// Not ready for execution due to a specific condition
1557    Queued(QueuedReason),
1558}
1559
1560impl AddedTransactionState {
1561    /// Returns whether the transaction was submitted as queued.
1562    pub const fn is_queued(&self) -> bool {
1563        matches!(self, Self::Queued(_))
1564    }
1565
1566    /// Returns whether the transaction was submitted as pending.
1567    pub const fn is_pending(&self) -> bool {
1568        matches!(self, Self::Pending)
1569    }
1570
1571    /// Returns the specific queued reason if the transaction is queued.
1572    pub const fn queued_reason(&self) -> Option<&QueuedReason> {
1573        match self {
1574            Self::Queued(reason) => Some(reason),
1575            Self::Pending => None,
1576        }
1577    }
1578}
1579
1580/// The outcome of a successful transaction addition
1581#[derive(Debug, Clone, PartialEq, Eq)]
1582pub struct AddedTransactionOutcome {
1583    /// The hash of the transaction
1584    pub hash: TxHash,
1585    /// The state of the transaction
1586    pub state: AddedTransactionState,
1587}
1588
1589impl AddedTransactionOutcome {
1590    /// Returns whether the transaction was submitted as queued.
1591    pub const fn is_queued(&self) -> bool {
1592        self.state.is_queued()
1593    }
1594
1595    /// Returns whether the transaction was submitted as pending.
1596    pub const fn is_pending(&self) -> bool {
1597        self.state.is_pending()
1598    }
1599}
1600
1601/// Contains all state changes after a [`CanonicalStateUpdate`] was processed
1602#[derive(Debug)]
1603pub(crate) struct OnNewCanonicalStateOutcome<T: PoolTransaction> {
1604    /// Hash of the block.
1605    pub(crate) block_hash: B256,
1606    /// All mined transactions.
1607    pub(crate) mined: Vec<TxHash>,
1608    /// Transactions promoted to the pending pool.
1609    pub(crate) promoted: Vec<Arc<ValidPoolTransaction<T>>>,
1610    /// transaction that were discarded during the update
1611    pub(crate) discarded: Vec<Arc<ValidPoolTransaction<T>>>,
1612}
1613
1614impl<T: PoolTransaction> OnNewCanonicalStateOutcome<T> {
1615    /// Returns all transactions that were promoted to the pending pool and adhere to the given
1616    /// [`TransactionListenerKind`].
1617    ///
1618    /// If the kind is [`TransactionListenerKind::PropagateOnly`], then only transactions that
1619    /// are allowed to be propagated are returned.
1620    pub(crate) fn pending_transactions(
1621        &self,
1622        kind: TransactionListenerKind,
1623    ) -> impl Iterator<Item = B256> + '_ {
1624        let iter = self.promoted.iter();
1625        PendingTransactionIter { kind, iter }
1626    }
1627
1628    /// Returns all FULL transactions that were promoted to the pending pool and adhere to the given
1629    /// [`TransactionListenerKind`].
1630    ///
1631    /// If the kind is [`TransactionListenerKind::PropagateOnly`], then only transactions that
1632    /// are allowed to be propagated are returned.
1633    pub(crate) fn full_pending_transactions(
1634        &self,
1635        kind: TransactionListenerKind,
1636    ) -> impl Iterator<Item = NewTransactionEvent<T>> + '_ {
1637        let iter = self.promoted.iter();
1638        FullPendingTransactionIter { kind, iter }
1639    }
1640}
1641
1642#[cfg(test)]
1643mod tests {
1644    use crate::{
1645        blobstore::{BlobStore, InMemoryBlobStore},
1646        identifier::SenderId,
1647        test_utils::{MockTransaction, TestPoolBuilder},
1648        validate::ValidTransaction,
1649        BlockInfo, PoolConfig, SubPoolLimit, TransactionOrigin, TransactionValidationOutcome, U256,
1650    };
1651    use alloy_eips::{eip4844::BlobTransactionSidecar, eip7594::BlobTransactionSidecarVariant};
1652    use alloy_primitives::Address;
1653    use std::{fs, path::PathBuf};
1654
1655    #[test]
1656    fn test_discard_blobs_on_blob_tx_eviction() {
1657        let blobs = {
1658            // Read the contents of the JSON file into a string.
1659            let json_content = fs::read_to_string(
1660                PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test_data/blob1.json"),
1661            )
1662            .expect("Failed to read the blob data file");
1663
1664            // Parse the JSON contents into a serde_json::Value.
1665            let json_value: serde_json::Value =
1666                serde_json::from_str(&json_content).expect("Failed to deserialize JSON");
1667
1668            // Extract blob data from JSON and convert it to Blob.
1669            vec![
1670                // Extract the "data" field from the JSON and parse it as a string.
1671                json_value
1672                    .get("data")
1673                    .unwrap()
1674                    .as_str()
1675                    .expect("Data is not a valid string")
1676                    .to_string(),
1677            ]
1678        };
1679
1680        // Generate a BlobTransactionSidecar from the blobs.
1681        let sidecar = BlobTransactionSidecarVariant::Eip4844(
1682            BlobTransactionSidecar::try_from_blobs_hex(blobs).unwrap(),
1683        );
1684
1685        // Define the maximum limit for blobs in the sub-pool.
1686        let blob_limit = SubPoolLimit::new(1000, usize::MAX);
1687
1688        // Create a test pool with default configuration and the specified blob limit.
1689        let test_pool = &TestPoolBuilder::default()
1690            .with_config(PoolConfig { blob_limit, ..Default::default() })
1691            .pool;
1692
1693        // Set the block info for the pool, including a pending blob fee.
1694        test_pool
1695            .set_block_info(BlockInfo { pending_blob_fee: Some(10_000_000), ..Default::default() });
1696
1697        // Create an in-memory blob store.
1698        let blob_store = InMemoryBlobStore::default();
1699
1700        // Loop to add transactions to the pool and test blob eviction.
1701        for n in 0..blob_limit.max_txs + 10 {
1702            // Create a mock transaction with the generated blob sidecar.
1703            let mut tx = MockTransaction::eip4844_with_sidecar(sidecar.clone());
1704
1705            // Set non zero size
1706            tx.set_size(1844674407370951);
1707
1708            // Insert the sidecar into the blob store if the current index is within the blob limit.
1709            if n < blob_limit.max_txs {
1710                blob_store.insert(*tx.get_hash(), sidecar.clone()).unwrap();
1711            }
1712
1713            // Add the transaction to the pool with external origin and valid outcome.
1714            test_pool.add_transactions(
1715                TransactionOrigin::External,
1716                [TransactionValidationOutcome::Valid {
1717                    balance: U256::from(1_000),
1718                    state_nonce: 0,
1719                    bytecode_hash: None,
1720                    transaction: ValidTransaction::ValidWithSidecar {
1721                        transaction: tx,
1722                        sidecar: sidecar.clone(),
1723                    },
1724                    propagate: true,
1725                    authorities: None,
1726                }],
1727            );
1728        }
1729
1730        // Assert that the size of the pool's blob component is equal to the maximum blob limit.
1731        assert_eq!(test_pool.size().blob, blob_limit.max_txs);
1732
1733        // Assert that the size of the pool's blob_size component matches the expected value.
1734        assert_eq!(test_pool.size().blob_size, 1844674407370951000);
1735
1736        // Assert that the pool's blob store matches the expected blob store.
1737        assert_eq!(*test_pool.blob_store(), blob_store);
1738    }
1739
1740    #[test]
1741    fn test_auths_stored_in_identifiers() {
1742        // Create a test pool with default configuration.
1743        let test_pool = &TestPoolBuilder::default().with_config(Default::default()).pool;
1744
1745        let auth = Address::new([1; 20]);
1746        let tx = MockTransaction::eip7702();
1747
1748        test_pool.add_transactions(
1749            TransactionOrigin::Local,
1750            [TransactionValidationOutcome::Valid {
1751                balance: U256::from(1_000),
1752                state_nonce: 0,
1753                bytecode_hash: None,
1754                transaction: ValidTransaction::Valid(tx),
1755                propagate: true,
1756                authorities: Some(vec![auth]),
1757            }],
1758        );
1759
1760        let identifiers = test_pool.identifiers.read();
1761        assert_eq!(identifiers.sender_id(&auth), Some(SenderId::from(1)));
1762    }
1763
1764    #[test]
1765    fn sender_queries_do_not_allocate_ids_for_unknown_addresses() {
1766        let test_pool = &TestPoolBuilder::default().with_config(Default::default()).pool;
1767        let sender = Address::new([9; 20]);
1768
1769        assert_eq!(test_pool.sender_id(&sender), None);
1770        assert!(test_pool.get_transactions_by_sender(sender).is_empty());
1771        assert!(test_pool.get_pending_transaction_by_sender_and_nonce(sender, 0).is_none());
1772        assert!(test_pool.get_queued_transactions_by_sender(sender).is_empty());
1773        assert!(test_pool.get_pending_transactions_by_sender(sender).is_empty());
1774        assert!(test_pool.get_highest_transaction_by_sender(sender).is_none());
1775        assert!(test_pool.get_highest_consecutive_transaction_by_sender(sender, 0).is_none());
1776        assert!(test_pool.remove_transactions_by_sender(sender).is_empty());
1777        assert_eq!(test_pool.sender_id(&sender), None);
1778    }
1779}