reth_transaction_pool/pool/
mod.rs

1//! Transaction Pool internals.
2//!
3//! Incoming transactions are validated before they enter the pool first. The validation outcome can
4//! have 3 states:
5//!
6//!  1. Transaction can _never_ be valid
7//!  2. Transaction is _currently_ valid
8//!  3. Transaction is _currently_ invalid, but could potentially become valid in the future
9//!
10//! However, (2.) and (3.) of a transaction can only be determined on the basis of the current
11//! state, whereas (1.) holds indefinitely. This means once the state changes (2.) and (3.) the
12//! state of a transaction needs to be reevaluated again.
13//!
14//! The transaction pool is responsible for storing new, valid transactions and providing the next
15//! best transactions sorted by their priority. Where priority is determined by the transaction's
16//! score ([`TransactionOrdering`]).
17//!
18//! Furthermore, the following characteristics fall under (3.):
19//!
20//!  a) Nonce of a transaction is higher than the expected nonce for the next transaction of its
21//! sender. A distinction is made here whether multiple transactions from the same sender have
22//! gapless nonce increments.
23//!
24//!  a)(1) If _no_ transaction is missing in a chain of multiple
25//! transactions from the same sender (all nonce in row), all of them can in principle be executed
26//! on the current state one after the other.
27//!
28//!  a)(2) If there's a nonce gap, then all
29//! transactions after the missing transaction are blocked until the missing transaction arrives.
30//!
31//!  b) Transaction does not meet the dynamic fee cap requirement introduced by EIP-1559: The
32//! fee cap of the transaction needs to be no less than the base fee of block.
33//!
34//!
35//! In essence the transaction pool is made of three separate sub-pools:
36//!
37//!  - Pending Pool: Contains all transactions that are valid on the current state and satisfy (3.
38//!    a)(1): _No_ nonce gaps. A _pending_ transaction is considered _ready_ when it has the lowest
39//!    nonce of all transactions from the same sender. Once a _ready_ transaction with nonce `n` has
40//!    been executed, the next highest transaction from the same sender `n + 1` becomes ready.
41//!
42//!  - Queued Pool: Contains all transactions that are currently blocked by missing transactions:
43//!    (3. a)(2): _With_ nonce gaps or due to lack of funds.
44//!
45//!  - Basefee Pool: To account for the dynamic base fee requirement (3. b) which could render an
46//!    EIP-1559 and all subsequent transactions of the sender currently invalid.
47//!
48//! The classification of transactions is always dependent on the current state that is changed as
49//! soon as a new block is mined. Once a new block is mined, the account changeset must be applied
50//! to the transaction pool.
51//!
52//!
53//! Depending on the use case, consumers of the [`TransactionPool`](crate::traits::TransactionPool)
54//! are interested in (2.) and/or (3.).
55
56//! A generic [`TransactionPool`](crate::traits::TransactionPool) that only handles transactions.
57//!
58//! This Pool maintains two separate sub-pools for (2.) and (3.)
59//!
60//! ## Terminology
61//!
62//!  - _Pending_: pending transactions are transactions that fall under (2.). These transactions can
63//!    currently be executed and are stored in the pending sub-pool
64//!  - _Queued_: queued transactions are transactions that fall under category (3.). Those
65//!    transactions are _currently_ waiting for state changes that eventually move them into
66//!    category (2.) and become pending.
67
68use crate::{
69    blobstore::BlobStore,
70    error::{PoolError, PoolErrorKind, PoolResult},
71    identifier::{SenderId, SenderIdentifiers, TransactionId},
72    metrics::BlobStoreMetrics,
73    pool::{
74        listener::{
75            BlobTransactionSidecarListener, PendingTransactionHashListener, PoolEventBroadcast,
76            TransactionListener,
77        },
78        state::SubPool,
79        txpool::{SenderInfo, TxPool},
80        update::UpdateOutcome,
81    },
82    traits::{
83        AllPoolTransactions, BestTransactionsAttributes, BlockInfo, GetPooledTransactionLimit,
84        NewBlobSidecar, PoolSize, PoolTransaction, PropagatedTransactions, TransactionOrigin,
85    },
86    validate::{TransactionValidationOutcome, ValidPoolTransaction, ValidTransaction},
87    CanonicalStateUpdate, EthPoolTransaction, PoolConfig, TransactionOrdering,
88    TransactionValidator,
89};
90
91use alloy_primitives::{Address, TxHash, B256};
92use best::BestTransactions;
93use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
94use reth_eth_wire_types::HandleMempoolData;
95use reth_execution_types::ChangedAccount;
96
97use alloy_eips::{eip4844::BlobTransactionSidecar, Typed2718};
98use reth_primitives_traits::Recovered;
99use rustc_hash::FxHashMap;
100use std::{collections::HashSet, fmt, sync::Arc, time::Instant};
101use tokio::sync::mpsc;
102use tracing::{debug, trace, warn};
103mod events;
104pub use best::{BestTransactionFilter, BestTransactionsWithPrioritizedSenders};
105pub use blob::{blob_tx_priority, fee_delta, BlobOrd, BlobTransactions};
106pub use events::{FullTransactionEvent, NewTransactionEvent, TransactionEvent};
107pub use listener::{AllTransactionsEvents, TransactionEvents, TransactionListenerKind};
108pub use parked::{BasefeeOrd, ParkedOrd, ParkedPool, QueuedOrd};
109pub use pending::PendingPool;
110use reth_primitives_traits::Block;
111
112mod best;
113mod blob;
114mod listener;
115mod parked;
116pub(crate) mod pending;
117pub(crate) mod size;
118pub(crate) mod state;
119pub mod txpool;
120mod update;
121
122/// Bound on number of pending transactions from `reth_network::TransactionsManager` to buffer.
123pub const PENDING_TX_LISTENER_BUFFER_SIZE: usize = 2048;
124/// Bound on number of new transactions from `reth_network::TransactionsManager` to buffer.
125pub const NEW_TX_LISTENER_BUFFER_SIZE: usize = 1024;
126
127const BLOB_SIDECAR_LISTENER_BUFFER_SIZE: usize = 512;
128
129/// Transaction pool internals.
130pub struct PoolInner<V, T, S>
131where
132    T: TransactionOrdering,
133{
134    /// Internal mapping of addresses to plain ints.
135    identifiers: RwLock<SenderIdentifiers>,
136    /// Transaction validator.
137    validator: V,
138    /// Storage for blob transactions
139    blob_store: S,
140    /// The internal pool that manages all transactions.
141    pool: RwLock<TxPool<T>>,
142    /// Pool settings.
143    config: PoolConfig,
144    /// Manages listeners for transaction state change events.
145    event_listener: RwLock<PoolEventBroadcast<T::Transaction>>,
146    /// Listeners for new _full_ pending transactions.
147    pending_transaction_listener: Mutex<Vec<PendingTransactionHashListener>>,
148    /// Listeners for new transactions added to the pool.
149    transaction_listener: Mutex<Vec<TransactionListener<T::Transaction>>>,
150    /// Listener for new blob transaction sidecars added to the pool.
151    blob_transaction_sidecar_listener: Mutex<Vec<BlobTransactionSidecarListener>>,
152    /// Metrics for the blob store
153    blob_store_metrics: BlobStoreMetrics,
154}
155
156// === impl PoolInner ===
157
158impl<V, T, S> PoolInner<V, T, S>
159where
160    V: TransactionValidator,
161    T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
162    S: BlobStore,
163{
164    /// Create a new transaction pool instance.
165    pub fn new(validator: V, ordering: T, blob_store: S, config: PoolConfig) -> Self {
166        Self {
167            identifiers: Default::default(),
168            validator,
169            event_listener: Default::default(),
170            pool: RwLock::new(TxPool::new(ordering, config.clone())),
171            pending_transaction_listener: Default::default(),
172            transaction_listener: Default::default(),
173            blob_transaction_sidecar_listener: Default::default(),
174            config,
175            blob_store,
176            blob_store_metrics: Default::default(),
177        }
178    }
179
180    /// Returns the configured blob store.
181    pub const fn blob_store(&self) -> &S {
182        &self.blob_store
183    }
184
185    /// Returns stats about the size of the pool.
186    pub fn size(&self) -> PoolSize {
187        self.get_pool_data().size()
188    }
189
190    /// Returns the currently tracked block
191    pub fn block_info(&self) -> BlockInfo {
192        self.get_pool_data().block_info()
193    }
194    /// Sets the currently tracked block
195    pub fn set_block_info(&self, info: BlockInfo) {
196        self.pool.write().set_block_info(info)
197    }
198
199    /// Returns the internal [`SenderId`] for this address
200    pub fn get_sender_id(&self, addr: Address) -> SenderId {
201        self.identifiers.write().sender_id_or_create(addr)
202    }
203
204    /// Returns all senders in the pool
205    pub fn unique_senders(&self) -> HashSet<Address> {
206        self.get_pool_data().unique_senders()
207    }
208
209    /// Converts the changed accounts to a map of sender ids to sender info (internal identifier
210    /// used for accounts)
211    fn changed_senders(
212        &self,
213        accs: impl Iterator<Item = ChangedAccount>,
214    ) -> FxHashMap<SenderId, SenderInfo> {
215        let mut identifiers = self.identifiers.write();
216        accs.into_iter()
217            .map(|acc| {
218                let ChangedAccount { address, nonce, balance } = acc;
219                let sender_id = identifiers.sender_id_or_create(address);
220                (sender_id, SenderInfo { state_nonce: nonce, balance })
221            })
222            .collect()
223    }
224
225    /// Get the config the pool was configured with.
226    pub const fn config(&self) -> &PoolConfig {
227        &self.config
228    }
229
230    /// Get the validator reference.
231    pub const fn validator(&self) -> &V {
232        &self.validator
233    }
234
235    /// Adds a new transaction listener to the pool that gets notified about every new _pending_
236    /// transaction inserted into the pool
237    pub fn add_pending_listener(&self, kind: TransactionListenerKind) -> mpsc::Receiver<TxHash> {
238        let (sender, rx) = mpsc::channel(self.config.pending_tx_listener_buffer_size);
239        let listener = PendingTransactionHashListener { sender, kind };
240        self.pending_transaction_listener.lock().push(listener);
241        rx
242    }
243
244    /// Adds a new transaction listener to the pool that gets notified about every new transaction.
245    pub fn add_new_transaction_listener(
246        &self,
247        kind: TransactionListenerKind,
248    ) -> mpsc::Receiver<NewTransactionEvent<T::Transaction>> {
249        let (sender, rx) = mpsc::channel(self.config.new_tx_listener_buffer_size);
250        let listener = TransactionListener { sender, kind };
251        self.transaction_listener.lock().push(listener);
252        rx
253    }
254    /// Adds a new blob sidecar listener to the pool that gets notified about every new
255    /// eip4844 transaction's blob sidecar.
256    pub fn add_blob_sidecar_listener(&self) -> mpsc::Receiver<NewBlobSidecar> {
257        let (sender, rx) = mpsc::channel(BLOB_SIDECAR_LISTENER_BUFFER_SIZE);
258        let listener = BlobTransactionSidecarListener { sender };
259        self.blob_transaction_sidecar_listener.lock().push(listener);
260        rx
261    }
262
263    /// If the pool contains the transaction, this adds a new listener that gets notified about
264    /// transaction events.
265    pub fn add_transaction_event_listener(&self, tx_hash: TxHash) -> Option<TransactionEvents> {
266        self.get_pool_data()
267            .contains(&tx_hash)
268            .then(|| self.event_listener.write().subscribe(tx_hash))
269    }
270
271    /// Adds a listener for all transaction events.
272    pub fn add_all_transactions_event_listener(&self) -> AllTransactionsEvents<T::Transaction> {
273        self.event_listener.write().subscribe_all()
274    }
275
276    /// Returns a read lock to the pool's data.
277    pub fn get_pool_data(&self) -> RwLockReadGuard<'_, TxPool<T>> {
278        self.pool.read()
279    }
280
281    /// Returns hashes of _all_ transactions in the pool.
282    pub fn pooled_transactions_hashes(&self) -> Vec<TxHash> {
283        self.get_pool_data()
284            .all()
285            .transactions_iter()
286            .filter(|tx| tx.propagate)
287            .map(|tx| *tx.hash())
288            .collect()
289    }
290
291    /// Returns _all_ transactions in the pool.
292    pub fn pooled_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
293        self.get_pool_data().all().transactions_iter().filter(|tx| tx.propagate).cloned().collect()
294    }
295
296    /// Returns only the first `max` transactions in the pool.
297    pub fn pooled_transactions_max(
298        &self,
299        max: usize,
300    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
301        self.get_pool_data()
302            .all()
303            .transactions_iter()
304            .filter(|tx| tx.propagate)
305            .take(max)
306            .cloned()
307            .collect()
308    }
309
310    /// Converts the internally tracked transaction to the pooled format.
311    ///
312    /// If the transaction is an EIP-4844 transaction, the blob sidecar is fetched from the blob
313    /// store and attached to the transaction.
314    fn to_pooled_transaction(
315        &self,
316        transaction: Arc<ValidPoolTransaction<T::Transaction>>,
317    ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
318    where
319        <V as TransactionValidator>::Transaction: EthPoolTransaction,
320    {
321        if transaction.is_eip4844() {
322            let sidecar = self.blob_store.get(*transaction.hash()).ok()??;
323            transaction.transaction.clone().try_into_pooled_eip4844(sidecar)
324        } else {
325            transaction
326                .transaction
327                .clone()
328                .try_into_pooled()
329                .inspect_err(|err| {
330                    debug!(
331                        target: "txpool", %err,
332                        "failed to convert transaction to pooled element; skipping",
333                    );
334                })
335                .ok()
336        }
337    }
338
339    /// Returns pooled transactions for the given transaction hashes.
340    pub fn get_pooled_transaction_elements(
341        &self,
342        tx_hashes: Vec<TxHash>,
343        limit: GetPooledTransactionLimit,
344    ) -> Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>
345    where
346        <V as TransactionValidator>::Transaction: EthPoolTransaction,
347    {
348        let transactions = self.get_all(tx_hashes);
349        let mut elements = Vec::with_capacity(transactions.len());
350        let mut size = 0;
351        for transaction in transactions {
352            let encoded_len = transaction.encoded_length();
353            let Some(pooled) = self.to_pooled_transaction(transaction) else {
354                continue;
355            };
356
357            size += encoded_len;
358            elements.push(pooled.into_inner());
359
360            if limit.exceeds(size) {
361                break
362            }
363        }
364
365        elements
366    }
367
368    /// Returns converted pooled transaction for the given transaction hash.
369    pub fn get_pooled_transaction_element(
370        &self,
371        tx_hash: TxHash,
372    ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
373    where
374        <V as TransactionValidator>::Transaction: EthPoolTransaction,
375    {
376        self.get(&tx_hash).and_then(|tx| self.to_pooled_transaction(tx))
377    }
378
379    /// Updates the entire pool after a new block was executed.
380    pub fn on_canonical_state_change<B>(&self, update: CanonicalStateUpdate<'_, B>)
381    where
382        B: Block,
383    {
384        trace!(target: "txpool", ?update, "updating pool on canonical state change");
385
386        let block_info = update.block_info();
387        let CanonicalStateUpdate {
388            new_tip, changed_accounts, mined_transactions, update_kind, ..
389        } = update;
390        self.validator.on_new_head_block(new_tip);
391
392        let changed_senders = self.changed_senders(changed_accounts.into_iter());
393
394        // update the pool
395        let outcome = self.pool.write().on_canonical_state_change(
396            block_info,
397            mined_transactions,
398            changed_senders,
399            update_kind,
400        );
401
402        // This will discard outdated transactions based on the account's nonce
403        self.delete_discarded_blobs(outcome.discarded.iter());
404
405        // notify listeners about updates
406        self.notify_on_new_state(outcome);
407    }
408
409    /// Performs account updates on the pool.
410    ///
411    /// This will either promote or discard transactions based on the new account state.
412    pub fn update_accounts(&self, accounts: Vec<ChangedAccount>) {
413        let changed_senders = self.changed_senders(accounts.into_iter());
414        let UpdateOutcome { promoted, discarded } =
415            self.pool.write().update_accounts(changed_senders);
416        let mut listener = self.event_listener.write();
417
418        promoted.iter().for_each(|tx| listener.pending(tx.hash(), None));
419        discarded.iter().for_each(|tx| listener.discarded(tx.hash()));
420
421        // This deletes outdated blob txs from the blob store, based on the account's nonce. This is
422        // called during txpool maintenance when the pool drifted.
423        self.delete_discarded_blobs(discarded.iter());
424    }
425
426    /// Add a single validated transaction into the pool.
427    ///
428    /// Note: this is only used internally by [`Self::add_transactions()`], all new transaction(s)
429    /// come in through that function, either as a batch or `std::iter::once`.
430    fn add_transaction(
431        &self,
432        pool: &mut RwLockWriteGuard<'_, TxPool<T>>,
433        origin: TransactionOrigin,
434        tx: TransactionValidationOutcome<T::Transaction>,
435    ) -> PoolResult<TxHash> {
436        match tx {
437            TransactionValidationOutcome::Valid {
438                balance,
439                state_nonce,
440                transaction,
441                propagate,
442            } => {
443                let sender_id = self.get_sender_id(transaction.sender());
444                let transaction_id = TransactionId::new(sender_id, transaction.nonce());
445
446                // split the valid transaction and the blob sidecar if it has any
447                let (transaction, maybe_sidecar) = match transaction {
448                    ValidTransaction::Valid(tx) => (tx, None),
449                    ValidTransaction::ValidWithSidecar { transaction, sidecar } => {
450                        debug_assert!(
451                            transaction.is_eip4844(),
452                            "validator returned sidecar for non EIP-4844 transaction"
453                        );
454                        (transaction, Some(sidecar))
455                    }
456                };
457
458                let tx = ValidPoolTransaction {
459                    transaction,
460                    transaction_id,
461                    propagate,
462                    timestamp: Instant::now(),
463                    origin,
464                };
465
466                let added = pool.add_transaction(tx, balance, state_nonce)?;
467                let hash = *added.hash();
468
469                // transaction was successfully inserted into the pool
470                if let Some(sidecar) = maybe_sidecar {
471                    // notify blob sidecar listeners
472                    self.on_new_blob_sidecar(&hash, &sidecar);
473                    // store the sidecar in the blob store
474                    self.insert_blob(hash, sidecar);
475                }
476
477                if let Some(replaced) = added.replaced_blob_transaction() {
478                    debug!(target: "txpool", "[{:?}] delete replaced blob sidecar", replaced);
479                    // delete the replaced transaction from the blob store
480                    self.delete_blob(replaced);
481                }
482
483                // Notify about new pending transactions
484                if let Some(pending) = added.as_pending() {
485                    self.on_new_pending_transaction(pending);
486                }
487
488                // Notify tx event listeners
489                self.notify_event_listeners(&added);
490
491                if let Some(discarded) = added.discarded_transactions() {
492                    self.delete_discarded_blobs(discarded.iter());
493                }
494
495                // Notify listeners for _all_ transactions
496                self.on_new_transaction(added.into_new_transaction_event());
497
498                Ok(hash)
499            }
500            TransactionValidationOutcome::Invalid(tx, err) => {
501                let mut listener = self.event_listener.write();
502                listener.invalid(tx.hash());
503                Err(PoolError::new(*tx.hash(), err))
504            }
505            TransactionValidationOutcome::Error(tx_hash, err) => {
506                let mut listener = self.event_listener.write();
507                listener.discarded(&tx_hash);
508                Err(PoolError::other(tx_hash, err))
509            }
510        }
511    }
512
513    /// Adds a transaction and returns the event stream.
514    pub fn add_transaction_and_subscribe(
515        &self,
516        origin: TransactionOrigin,
517        tx: TransactionValidationOutcome<T::Transaction>,
518    ) -> PoolResult<TransactionEvents> {
519        let listener = {
520            let mut listener = self.event_listener.write();
521            listener.subscribe(tx.tx_hash())
522        };
523        let mut results = self.add_transactions(origin, std::iter::once(tx));
524        results.pop().expect("result length is the same as the input")?;
525        Ok(listener)
526    }
527
528    /// Adds all transactions in the iterator to the pool, returning a list of results.
529    ///
530    /// Note: A large batch may lock the pool for a long time that blocks important operations
531    /// like updating the pool on canonical state changes. The caller should consider having
532    /// a max batch size to balance transaction insertions with other updates.
533    pub fn add_transactions(
534        &self,
535        origin: TransactionOrigin,
536        transactions: impl IntoIterator<Item = TransactionValidationOutcome<T::Transaction>>,
537    ) -> Vec<PoolResult<TxHash>> {
538        // Add the transactions and enforce the pool size limits in one write lock
539        let (mut added, discarded) = {
540            let mut pool = self.pool.write();
541            let added = transactions
542                .into_iter()
543                .map(|tx| self.add_transaction(&mut pool, origin, tx))
544                .collect::<Vec<_>>();
545
546            // Enforce the pool size limits if at least one transaction was added successfully
547            let discarded = if added.iter().any(Result::is_ok) {
548                pool.discard_worst()
549            } else {
550                Default::default()
551            };
552
553            (added, discarded)
554        };
555
556        if !discarded.is_empty() {
557            // Delete any blobs associated with discarded blob transactions
558            self.delete_discarded_blobs(discarded.iter());
559
560            let discarded_hashes =
561                discarded.into_iter().map(|tx| *tx.hash()).collect::<HashSet<_>>();
562
563            {
564                let mut listener = self.event_listener.write();
565                discarded_hashes.iter().for_each(|hash| listener.discarded(hash));
566            }
567
568            // A newly added transaction may be immediately discarded, so we need to
569            // adjust the result here
570            for res in &mut added {
571                if let Ok(hash) = res {
572                    if discarded_hashes.contains(hash) {
573                        *res = Err(PoolError::new(*hash, PoolErrorKind::DiscardedOnInsert))
574                    }
575                }
576            }
577        }
578
579        added
580    }
581
582    /// Notify all listeners about a new pending transaction.
583    fn on_new_pending_transaction(&self, pending: &AddedPendingTransaction<T::Transaction>) {
584        let propagate_allowed = pending.is_propagate_allowed();
585
586        let mut transaction_listeners = self.pending_transaction_listener.lock();
587        transaction_listeners.retain_mut(|listener| {
588            if listener.kind.is_propagate_only() && !propagate_allowed {
589                // only emit this hash to listeners that are only allowed to receive propagate only
590                // transactions, such as network
591                return !listener.sender.is_closed()
592            }
593
594            // broadcast all pending transactions to the listener
595            listener.send_all(pending.pending_transactions(listener.kind))
596        });
597    }
598
599    /// Notify all listeners about a newly inserted pending transaction.
600    fn on_new_transaction(&self, event: NewTransactionEvent<T::Transaction>) {
601        let mut transaction_listeners = self.transaction_listener.lock();
602        transaction_listeners.retain_mut(|listener| {
603            if listener.kind.is_propagate_only() && !event.transaction.propagate {
604                // only emit this hash to listeners that are only allowed to receive propagate only
605                // transactions, such as network
606                return !listener.sender.is_closed()
607            }
608
609            listener.send(event.clone())
610        });
611    }
612
613    /// Notify all listeners about a blob sidecar for a newly inserted blob (eip4844) transaction.
614    fn on_new_blob_sidecar(&self, tx_hash: &TxHash, sidecar: &BlobTransactionSidecar) {
615        let mut sidecar_listeners = self.blob_transaction_sidecar_listener.lock();
616        if sidecar_listeners.is_empty() {
617            return
618        }
619        let sidecar = Arc::new(sidecar.clone());
620        sidecar_listeners.retain_mut(|listener| {
621            let new_blob_event = NewBlobSidecar { tx_hash: *tx_hash, sidecar: sidecar.clone() };
622            match listener.sender.try_send(new_blob_event) {
623                Ok(()) => true,
624                Err(err) => {
625                    if matches!(err, mpsc::error::TrySendError::Full(_)) {
626                        debug!(
627                            target: "txpool",
628                            "[{:?}] failed to send blob sidecar; channel full",
629                            sidecar,
630                        );
631                        true
632                    } else {
633                        false
634                    }
635                }
636            }
637        })
638    }
639
640    /// Notifies transaction listeners about changes once a block was processed.
641    fn notify_on_new_state(&self, outcome: OnNewCanonicalStateOutcome<T::Transaction>) {
642        trace!(target: "txpool", promoted=outcome.promoted.len(), discarded= outcome.discarded.len() ,"notifying listeners on state change");
643
644        // notify about promoted pending transactions
645        // emit hashes
646        self.pending_transaction_listener
647            .lock()
648            .retain_mut(|listener| listener.send_all(outcome.pending_transactions(listener.kind)));
649
650        // emit full transactions
651        self.transaction_listener.lock().retain_mut(|listener| {
652            listener.send_all(outcome.full_pending_transactions(listener.kind))
653        });
654
655        let OnNewCanonicalStateOutcome { mined, promoted, discarded, block_hash } = outcome;
656
657        // broadcast specific transaction events
658        let mut listener = self.event_listener.write();
659
660        mined.iter().for_each(|tx| listener.mined(tx, block_hash));
661        promoted.iter().for_each(|tx| listener.pending(tx.hash(), None));
662        discarded.iter().for_each(|tx| listener.discarded(tx.hash()));
663    }
664
665    /// Fire events for the newly added transaction if there are any.
666    fn notify_event_listeners(&self, tx: &AddedTransaction<T::Transaction>) {
667        let mut listener = self.event_listener.write();
668
669        match tx {
670            AddedTransaction::Pending(tx) => {
671                let AddedPendingTransaction { transaction, promoted, discarded, replaced } = tx;
672
673                listener.pending(transaction.hash(), replaced.clone());
674                promoted.iter().for_each(|tx| listener.pending(tx.hash(), None));
675                discarded.iter().for_each(|tx| listener.discarded(tx.hash()));
676            }
677            AddedTransaction::Parked { transaction, replaced, .. } => {
678                listener.queued(transaction.hash());
679                if let Some(replaced) = replaced {
680                    listener.replaced(replaced.clone(), *transaction.hash());
681                }
682            }
683        }
684    }
685
686    /// Returns an iterator that yields transactions that are ready to be included in the block.
687    pub fn best_transactions(&self) -> BestTransactions<T> {
688        self.get_pool_data().best_transactions()
689    }
690
691    /// Returns an iterator that yields transactions that are ready to be included in the block with
692    /// the given base fee and optional blob fee attributes.
693    pub fn best_transactions_with_attributes(
694        &self,
695        best_transactions_attributes: BestTransactionsAttributes,
696    ) -> Box<dyn crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T::Transaction>>>>
697    {
698        self.get_pool_data().best_transactions_with_attributes(best_transactions_attributes)
699    }
700
701    /// Returns only the first `max` transactions in the pending pool.
702    pub fn pending_transactions_max(
703        &self,
704        max: usize,
705    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
706        self.get_pool_data().pending_transactions_iter().take(max).collect()
707    }
708
709    /// Returns all transactions from the pending sub-pool
710    pub fn pending_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
711        self.get_pool_data().pending_transactions()
712    }
713
714    /// Returns all transactions from parked pools
715    pub fn queued_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
716        self.get_pool_data().queued_transactions()
717    }
718
719    /// Returns all transactions in the pool
720    pub fn all_transactions(&self) -> AllPoolTransactions<T::Transaction> {
721        let pool = self.get_pool_data();
722        AllPoolTransactions {
723            pending: pool.pending_transactions(),
724            queued: pool.queued_transactions(),
725        }
726    }
727
728    /// Removes and returns all matching transactions from the pool.
729    ///
730    /// This behaves as if the transactions got discarded (_not_ mined), effectively introducing a
731    /// nonce gap for the given transactions.
732    pub fn remove_transactions(
733        &self,
734        hashes: Vec<TxHash>,
735    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
736        if hashes.is_empty() {
737            return Vec::new()
738        }
739        let removed = self.pool.write().remove_transactions(hashes);
740
741        let mut listener = self.event_listener.write();
742
743        removed.iter().for_each(|tx| listener.discarded(tx.hash()));
744
745        removed
746    }
747
748    /// Removes and returns all matching transactions and their dependent transactions from the
749    /// pool.
750    pub fn remove_transactions_and_descendants(
751        &self,
752        hashes: Vec<TxHash>,
753    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
754        if hashes.is_empty() {
755            return Vec::new()
756        }
757        let removed = self.pool.write().remove_transactions_and_descendants(hashes);
758
759        let mut listener = self.event_listener.write();
760
761        removed.iter().for_each(|tx| listener.discarded(tx.hash()));
762
763        removed
764    }
765
766    /// Removes and returns all transactions by the specified sender from the pool.
767    pub fn remove_transactions_by_sender(
768        &self,
769        sender: Address,
770    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
771        let sender_id = self.get_sender_id(sender);
772        let removed = self.pool.write().remove_transactions_by_sender(sender_id);
773
774        let mut listener = self.event_listener.write();
775
776        removed.iter().for_each(|tx| listener.discarded(tx.hash()));
777
778        removed
779    }
780
781    /// Removes and returns all transactions that are present in the pool.
782    pub fn retain_unknown<A>(&self, announcement: &mut A)
783    where
784        A: HandleMempoolData,
785    {
786        if announcement.is_empty() {
787            return
788        }
789        let pool = self.get_pool_data();
790        announcement.retain_by_hash(|tx| !pool.contains(tx))
791    }
792
793    /// Returns the transaction by hash.
794    pub fn get(&self, tx_hash: &TxHash) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
795        self.get_pool_data().get(tx_hash)
796    }
797
798    /// Returns all transactions of the address
799    pub fn get_transactions_by_sender(
800        &self,
801        sender: Address,
802    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
803        let sender_id = self.get_sender_id(sender);
804        self.get_pool_data().get_transactions_by_sender(sender_id)
805    }
806
807    /// Returns all queued transactions of the address by sender
808    pub fn get_queued_transactions_by_sender(
809        &self,
810        sender: Address,
811    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
812        let sender_id = self.get_sender_id(sender);
813        self.get_pool_data().queued_txs_by_sender(sender_id)
814    }
815
816    /// Returns all pending transactions filtered by predicate
817    pub fn pending_transactions_with_predicate(
818        &self,
819        predicate: impl FnMut(&ValidPoolTransaction<T::Transaction>) -> bool,
820    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
821        self.get_pool_data().pending_transactions_with_predicate(predicate)
822    }
823
824    /// Returns all pending transactions of the address by sender
825    pub fn get_pending_transactions_by_sender(
826        &self,
827        sender: Address,
828    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
829        let sender_id = self.get_sender_id(sender);
830        self.get_pool_data().pending_txs_by_sender(sender_id)
831    }
832
833    /// Returns the highest transaction of the address
834    pub fn get_highest_transaction_by_sender(
835        &self,
836        sender: Address,
837    ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
838        let sender_id = self.get_sender_id(sender);
839        self.get_pool_data().get_highest_transaction_by_sender(sender_id)
840    }
841
842    /// Returns the transaction with the highest nonce that is executable given the on chain nonce.
843    pub fn get_highest_consecutive_transaction_by_sender(
844        &self,
845        sender: Address,
846        on_chain_nonce: u64,
847    ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
848        let sender_id = self.get_sender_id(sender);
849        self.get_pool_data().get_highest_consecutive_transaction_by_sender(
850            sender_id.into_transaction_id(on_chain_nonce),
851        )
852    }
853
854    /// Returns the transaction given a [`TransactionId`]
855    pub fn get_transaction_by_transaction_id(
856        &self,
857        transaction_id: &TransactionId,
858    ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
859        self.get_pool_data().all().get(transaction_id).map(|tx| tx.transaction.clone())
860    }
861
862    /// Returns all transactions that where submitted with the given [`TransactionOrigin`]
863    pub fn get_transactions_by_origin(
864        &self,
865        origin: TransactionOrigin,
866    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
867        self.get_pool_data()
868            .all()
869            .transactions_iter()
870            .filter(|tx| tx.origin == origin)
871            .cloned()
872            .collect()
873    }
874
875    /// Returns all pending transactions filted by [`TransactionOrigin`]
876    pub fn get_pending_transactions_by_origin(
877        &self,
878        origin: TransactionOrigin,
879    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
880        self.get_pool_data().pending_transactions_iter().filter(|tx| tx.origin == origin).collect()
881    }
882
883    /// Returns all the transactions belonging to the hashes.
884    ///
885    /// If no transaction exists, it is skipped.
886    pub fn get_all(&self, txs: Vec<TxHash>) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
887        if txs.is_empty() {
888            return Vec::new()
889        }
890        self.get_pool_data().get_all(txs).collect()
891    }
892
893    /// Notify about propagated transactions.
894    pub fn on_propagated(&self, txs: PropagatedTransactions) {
895        if txs.0.is_empty() {
896            return
897        }
898        let mut listener = self.event_listener.write();
899
900        txs.0.into_iter().for_each(|(hash, peers)| listener.propagated(&hash, peers))
901    }
902
903    /// Number of transactions in the entire pool
904    pub fn len(&self) -> usize {
905        self.get_pool_data().len()
906    }
907
908    /// Whether the pool is empty
909    pub fn is_empty(&self) -> bool {
910        self.get_pool_data().is_empty()
911    }
912
913    /// Returns whether or not the pool is over its configured size and transaction count limits.
914    pub fn is_exceeded(&self) -> bool {
915        self.pool.read().is_exceeded()
916    }
917
918    /// Inserts a blob transaction into the blob store
919    fn insert_blob(&self, hash: TxHash, blob: BlobTransactionSidecar) {
920        debug!(target: "txpool", "[{:?}] storing blob sidecar", hash);
921        if let Err(err) = self.blob_store.insert(hash, blob) {
922            warn!(target: "txpool", %err, "[{:?}] failed to insert blob", hash);
923            self.blob_store_metrics.blobstore_failed_inserts.increment(1);
924        }
925        self.update_blob_store_metrics();
926    }
927
928    /// Delete a blob from the blob store
929    pub fn delete_blob(&self, blob: TxHash) {
930        let _ = self.blob_store.delete(blob);
931    }
932
933    /// Delete all blobs from the blob store
934    pub fn delete_blobs(&self, txs: Vec<TxHash>) {
935        let _ = self.blob_store.delete_all(txs);
936    }
937
938    /// Cleans up the blob store
939    pub fn cleanup_blobs(&self) {
940        let stat = self.blob_store.cleanup();
941        self.blob_store_metrics.blobstore_failed_deletes.increment(stat.delete_failed as u64);
942        self.update_blob_store_metrics();
943    }
944
945    fn update_blob_store_metrics(&self) {
946        if let Some(data_size) = self.blob_store.data_size_hint() {
947            self.blob_store_metrics.blobstore_byte_size.set(data_size as f64);
948        }
949        self.blob_store_metrics.blobstore_entries.set(self.blob_store.blobs_len() as f64);
950    }
951
952    /// Deletes all blob transactions that were discarded.
953    fn delete_discarded_blobs<'a>(
954        &'a self,
955        transactions: impl IntoIterator<Item = &'a Arc<ValidPoolTransaction<T::Transaction>>>,
956    ) {
957        let blob_txs = transactions
958            .into_iter()
959            .filter(|tx| tx.transaction.is_eip4844())
960            .map(|tx| *tx.hash())
961            .collect();
962        self.delete_blobs(blob_txs);
963    }
964}
965
966impl<V, T: TransactionOrdering, S> fmt::Debug for PoolInner<V, T, S> {
967    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
968        f.debug_struct("PoolInner").field("config", &self.config).finish_non_exhaustive()
969    }
970}
971
972/// Tracks an added transaction and all graph changes caused by adding it.
973#[derive(Debug, Clone)]
974pub struct AddedPendingTransaction<T: PoolTransaction> {
975    /// Inserted transaction.
976    transaction: Arc<ValidPoolTransaction<T>>,
977    /// Replaced transaction.
978    replaced: Option<Arc<ValidPoolTransaction<T>>>,
979    /// transactions promoted to the pending queue
980    promoted: Vec<Arc<ValidPoolTransaction<T>>>,
981    /// transactions that failed and became discarded
982    discarded: Vec<Arc<ValidPoolTransaction<T>>>,
983}
984
985impl<T: PoolTransaction> AddedPendingTransaction<T> {
986    /// Returns all transactions that were promoted to the pending pool and adhere to the given
987    /// [`TransactionListenerKind`].
988    ///
989    /// If the kind is [`TransactionListenerKind::PropagateOnly`], then only transactions that
990    /// are allowed to be propagated are returned.
991    pub(crate) fn pending_transactions(
992        &self,
993        kind: TransactionListenerKind,
994    ) -> impl Iterator<Item = B256> + '_ {
995        let iter = std::iter::once(&self.transaction).chain(self.promoted.iter());
996        PendingTransactionIter { kind, iter }
997    }
998
999    /// Returns if the transaction should be propagated.
1000    pub(crate) fn is_propagate_allowed(&self) -> bool {
1001        self.transaction.propagate
1002    }
1003}
1004
1005pub(crate) struct PendingTransactionIter<Iter> {
1006    kind: TransactionListenerKind,
1007    iter: Iter,
1008}
1009
1010impl<'a, Iter, T> Iterator for PendingTransactionIter<Iter>
1011where
1012    Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1013    T: PoolTransaction + 'a,
1014{
1015    type Item = B256;
1016
1017    fn next(&mut self) -> Option<Self::Item> {
1018        loop {
1019            let next = self.iter.next()?;
1020            if self.kind.is_propagate_only() && !next.propagate {
1021                continue
1022            }
1023            return Some(*next.hash())
1024        }
1025    }
1026}
1027
1028/// An iterator over full pending transactions
1029pub(crate) struct FullPendingTransactionIter<Iter> {
1030    kind: TransactionListenerKind,
1031    iter: Iter,
1032}
1033
1034impl<'a, Iter, T> Iterator for FullPendingTransactionIter<Iter>
1035where
1036    Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1037    T: PoolTransaction + 'a,
1038{
1039    type Item = NewTransactionEvent<T>;
1040
1041    fn next(&mut self) -> Option<Self::Item> {
1042        loop {
1043            let next = self.iter.next()?;
1044            if self.kind.is_propagate_only() && !next.propagate {
1045                continue
1046            }
1047            return Some(NewTransactionEvent {
1048                subpool: SubPool::Pending,
1049                transaction: next.clone(),
1050            })
1051        }
1052    }
1053}
1054
1055/// Represents a transaction that was added into the pool and its state
1056#[derive(Debug, Clone)]
1057pub enum AddedTransaction<T: PoolTransaction> {
1058    /// Transaction was successfully added and moved to the pending pool.
1059    Pending(AddedPendingTransaction<T>),
1060    /// Transaction was successfully added but not yet ready for processing and moved to a
1061    /// parked pool instead.
1062    Parked {
1063        /// Inserted transaction.
1064        transaction: Arc<ValidPoolTransaction<T>>,
1065        /// Replaced transaction.
1066        replaced: Option<Arc<ValidPoolTransaction<T>>>,
1067        /// The subpool it was moved to.
1068        subpool: SubPool,
1069    },
1070}
1071
1072impl<T: PoolTransaction> AddedTransaction<T> {
1073    /// Returns whether the transaction has been added to the pending pool.
1074    pub(crate) const fn as_pending(&self) -> Option<&AddedPendingTransaction<T>> {
1075        match self {
1076            Self::Pending(tx) => Some(tx),
1077            _ => None,
1078        }
1079    }
1080
1081    /// Returns the replaced transaction if there was one
1082    pub(crate) const fn replaced(&self) -> Option<&Arc<ValidPoolTransaction<T>>> {
1083        match self {
1084            Self::Pending(tx) => tx.replaced.as_ref(),
1085            Self::Parked { replaced, .. } => replaced.as_ref(),
1086        }
1087    }
1088
1089    /// Returns the discarded transactions if there were any
1090    pub(crate) fn discarded_transactions(&self) -> Option<&[Arc<ValidPoolTransaction<T>>]> {
1091        match self {
1092            Self::Pending(tx) => Some(&tx.discarded),
1093            Self::Parked { .. } => None,
1094        }
1095    }
1096
1097    /// Returns the hash of the replaced transaction if it is a blob transaction.
1098    pub(crate) fn replaced_blob_transaction(&self) -> Option<B256> {
1099        self.replaced().filter(|tx| tx.transaction.is_eip4844()).map(|tx| *tx.transaction.hash())
1100    }
1101
1102    /// Returns the hash of the transaction
1103    pub(crate) fn hash(&self) -> &TxHash {
1104        match self {
1105            Self::Pending(tx) => tx.transaction.hash(),
1106            Self::Parked { transaction, .. } => transaction.hash(),
1107        }
1108    }
1109
1110    /// Converts this type into the event type for listeners
1111    pub(crate) fn into_new_transaction_event(self) -> NewTransactionEvent<T> {
1112        match self {
1113            Self::Pending(tx) => {
1114                NewTransactionEvent { subpool: SubPool::Pending, transaction: tx.transaction }
1115            }
1116            Self::Parked { transaction, subpool, .. } => {
1117                NewTransactionEvent { transaction, subpool }
1118            }
1119        }
1120    }
1121
1122    /// Returns the subpool this transaction was added to
1123    #[cfg(test)]
1124    pub(crate) const fn subpool(&self) -> SubPool {
1125        match self {
1126            Self::Pending(_) => SubPool::Pending,
1127            Self::Parked { subpool, .. } => *subpool,
1128        }
1129    }
1130
1131    /// Returns the [`TransactionId`] of the added transaction
1132    #[cfg(test)]
1133    pub(crate) fn id(&self) -> &TransactionId {
1134        match self {
1135            Self::Pending(added) => added.transaction.id(),
1136            Self::Parked { transaction, .. } => transaction.id(),
1137        }
1138    }
1139}
1140
1141/// Contains all state changes after a [`CanonicalStateUpdate`] was processed
1142#[derive(Debug)]
1143pub(crate) struct OnNewCanonicalStateOutcome<T: PoolTransaction> {
1144    /// Hash of the block.
1145    pub(crate) block_hash: B256,
1146    /// All mined transactions.
1147    pub(crate) mined: Vec<TxHash>,
1148    /// Transactions promoted to the pending pool.
1149    pub(crate) promoted: Vec<Arc<ValidPoolTransaction<T>>>,
1150    /// transaction that were discarded during the update
1151    pub(crate) discarded: Vec<Arc<ValidPoolTransaction<T>>>,
1152}
1153
1154impl<T: PoolTransaction> OnNewCanonicalStateOutcome<T> {
1155    /// Returns all transactions that were promoted to the pending pool and adhere to the given
1156    /// [`TransactionListenerKind`].
1157    ///
1158    /// If the kind is [`TransactionListenerKind::PropagateOnly`], then only transactions that
1159    /// are allowed to be propagated are returned.
1160    pub(crate) fn pending_transactions(
1161        &self,
1162        kind: TransactionListenerKind,
1163    ) -> impl Iterator<Item = B256> + '_ {
1164        let iter = self.promoted.iter();
1165        PendingTransactionIter { kind, iter }
1166    }
1167
1168    /// Returns all FULL transactions that were promoted to the pending pool and adhere to the given
1169    /// [`TransactionListenerKind`].
1170    ///
1171    /// If the kind is [`TransactionListenerKind::PropagateOnly`], then only transactions that
1172    /// are allowed to be propagated are returned.
1173    pub(crate) fn full_pending_transactions(
1174        &self,
1175        kind: TransactionListenerKind,
1176    ) -> impl Iterator<Item = NewTransactionEvent<T>> + '_ {
1177        let iter = self.promoted.iter();
1178        FullPendingTransactionIter { kind, iter }
1179    }
1180}
1181
1182#[cfg(test)]
1183mod tests {
1184    use crate::{
1185        blobstore::{BlobStore, InMemoryBlobStore},
1186        test_utils::{MockTransaction, TestPoolBuilder},
1187        validate::ValidTransaction,
1188        BlockInfo, PoolConfig, SubPoolLimit, TransactionOrigin, TransactionValidationOutcome, U256,
1189    };
1190    use alloy_eips::eip4844::BlobTransactionSidecar;
1191    use std::{fs, path::PathBuf};
1192
1193    #[test]
1194    fn test_discard_blobs_on_blob_tx_eviction() {
1195        let blobs = {
1196            // Read the contents of the JSON file into a string.
1197            let json_content = fs::read_to_string(
1198                PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test_data/blob1.json"),
1199            )
1200            .expect("Failed to read the blob data file");
1201
1202            // Parse the JSON contents into a serde_json::Value.
1203            let json_value: serde_json::Value =
1204                serde_json::from_str(&json_content).expect("Failed to deserialize JSON");
1205
1206            // Extract blob data from JSON and convert it to Blob.
1207            vec![
1208                // Extract the "data" field from the JSON and parse it as a string.
1209                json_value
1210                    .get("data")
1211                    .unwrap()
1212                    .as_str()
1213                    .expect("Data is not a valid string")
1214                    .to_string(),
1215            ]
1216        };
1217
1218        // Generate a BlobTransactionSidecar from the blobs.
1219        let sidecar = BlobTransactionSidecar::try_from_blobs_hex(blobs).unwrap();
1220
1221        // Define the maximum limit for blobs in the sub-pool.
1222        let blob_limit = SubPoolLimit::new(1000, usize::MAX);
1223
1224        // Create a test pool with default configuration and the specified blob limit.
1225        let test_pool = &TestPoolBuilder::default()
1226            .with_config(PoolConfig { blob_limit, ..Default::default() })
1227            .pool;
1228
1229        // Set the block info for the pool, including a pending blob fee.
1230        test_pool
1231            .set_block_info(BlockInfo { pending_blob_fee: Some(10_000_000), ..Default::default() });
1232
1233        // Create an in-memory blob store.
1234        let blob_store = InMemoryBlobStore::default();
1235
1236        // Loop to add transactions to the pool and test blob eviction.
1237        for n in 0..blob_limit.max_txs + 10 {
1238            // Create a mock transaction with the generated blob sidecar.
1239            let mut tx = MockTransaction::eip4844_with_sidecar(sidecar.clone());
1240
1241            // Set non zero size
1242            tx.set_size(1844674407370951);
1243
1244            // Insert the sidecar into the blob store if the current index is within the blob limit.
1245            if n < blob_limit.max_txs {
1246                blob_store.insert(*tx.get_hash(), sidecar.clone()).unwrap();
1247            }
1248
1249            // Add the transaction to the pool with external origin and valid outcome.
1250            test_pool.add_transactions(
1251                TransactionOrigin::External,
1252                [TransactionValidationOutcome::Valid {
1253                    balance: U256::from(1_000),
1254                    state_nonce: 0,
1255                    transaction: ValidTransaction::ValidWithSidecar {
1256                        transaction: tx,
1257                        sidecar: sidecar.clone(),
1258                    },
1259                    propagate: true,
1260                }],
1261            );
1262        }
1263
1264        // Assert that the size of the pool's blob component is equal to the maximum blob limit.
1265        assert_eq!(test_pool.size().blob, blob_limit.max_txs);
1266
1267        // Assert that the size of the pool's blob_size component matches the expected value.
1268        assert_eq!(test_pool.size().blob_size, 1844674407370951000);
1269
1270        // Assert that the pool's blob store matches the expected blob store.
1271        assert_eq!(*test_pool.blob_store(), blob_store);
1272    }
1273}