reth_transaction_pool/pool/
mod.rs

1//! Transaction Pool internals.
2//!
3//! Incoming transactions are validated before they enter the pool first. The validation outcome can
4//! have 3 states:
5//!
6//!  1. Transaction can _never_ be valid
7//!  2. Transaction is _currently_ valid
8//!  3. Transaction is _currently_ invalid, but could potentially become valid in the future
9//!
10//! However, (2.) and (3.) of a transaction can only be determined on the basis of the current
11//! state, whereas (1.) holds indefinitely. This means once the state changes (2.) and (3.) the
12//! state of a transaction needs to be reevaluated again.
13//!
14//! The transaction pool is responsible for storing new, valid transactions and providing the next
15//! best transactions sorted by their priority. Where priority is determined by the transaction's
16//! score ([`TransactionOrdering`]).
17//!
18//! Furthermore, the following characteristics fall under (3.):
19//!
20//!  a) Nonce of a transaction is higher than the expected nonce for the next transaction of its
21//! sender. A distinction is made here whether multiple transactions from the same sender have
22//! gapless nonce increments.
23//!
24//!  a)(1) If _no_ transaction is missing in a chain of multiple
25//! transactions from the same sender (all nonce in row), all of them can in principle be executed
26//! on the current state one after the other.
27//!
28//!  a)(2) If there's a nonce gap, then all
29//! transactions after the missing transaction are blocked until the missing transaction arrives.
30//!
31//!  b) Transaction does not meet the dynamic fee cap requirement introduced by EIP-1559: The
32//! fee cap of the transaction needs to be no less than the base fee of block.
33//!
34//!
35//! In essence the transaction pool is made of three separate sub-pools:
36//!
37//!  - Pending Pool: Contains all transactions that are valid on the current state and satisfy (3.
38//!    a)(1): _No_ nonce gaps. A _pending_ transaction is considered _ready_ when it has the lowest
39//!    nonce of all transactions from the same sender. Once a _ready_ transaction with nonce `n` has
40//!    been executed, the next highest transaction from the same sender `n + 1` becomes ready.
41//!
42//!  - Queued Pool: Contains all transactions that are currently blocked by missing transactions:
43//!    (3. a)(2): _With_ nonce gaps or due to lack of funds.
44//!
45//!  - Basefee Pool: To account for the dynamic base fee requirement (3. b) which could render an
46//!    EIP-1559 and all subsequent transactions of the sender currently invalid.
47//!
48//! The classification of transactions is always dependent on the current state that is changed as
49//! soon as a new block is mined. Once a new block is mined, the account changeset must be applied
50//! to the transaction pool.
51//!
52//!
53//! Depending on the use case, consumers of the [`TransactionPool`](crate::traits::TransactionPool)
54//! are interested in (2.) and/or (3.).
55
56//! A generic [`TransactionPool`](crate::traits::TransactionPool) that only handles transactions.
57//!
58//! This Pool maintains two separate sub-pools for (2.) and (3.)
59//!
60//! ## Terminology
61//!
62//!  - _Pending_: pending transactions are transactions that fall under (2.). These transactions can
63//!    currently be executed and are stored in the pending sub-pool
64//!  - _Queued_: queued transactions are transactions that fall under category (3.). Those
65//!    transactions are _currently_ waiting for state changes that eventually move them into
66//!    category (2.) and become pending.
67
68use crate::{
69    error::{PoolError, PoolErrorKind, PoolResult},
70    identifier::{SenderId, SenderIdentifiers, TransactionId},
71    pool::{
72        listener::PoolEventBroadcast,
73        state::SubPool,
74        txpool::{SenderInfo, TxPool},
75    },
76    traits::{
77        AllPoolTransactions, BestTransactionsAttributes, BlockInfo, NewTransactionEvent, PoolSize,
78        PoolTransaction, PropagatedTransactions, TransactionOrigin,
79    },
80    validate::{TransactionValidationOutcome, ValidPoolTransaction},
81    CanonicalStateUpdate, EthPoolTransaction, PoolConfig, TransactionOrdering,
82    TransactionValidator,
83};
84
85use alloy_primitives::{Address, TxHash, B256};
86use best::BestTransactions;
87use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
88use reth_eth_wire_types::HandleMempoolData;
89use reth_execution_types::ChangedAccount;
90
91use alloy_eips::{eip4844::BlobTransactionSidecar, Typed2718};
92use reth_primitives_traits::Recovered;
93use rustc_hash::FxHashMap;
94use std::{collections::HashSet, fmt, sync::Arc, time::Instant};
95use tokio::sync::mpsc;
96use tracing::{debug, trace, warn};
97mod events;
98use crate::{
99    blobstore::BlobStore,
100    metrics::BlobStoreMetrics,
101    pool::txpool::UpdateOutcome,
102    traits::{GetPooledTransactionLimit, NewBlobSidecar, TransactionListenerKind},
103    validate::ValidTransaction,
104};
105pub use best::{BestTransactionFilter, BestTransactionsWithPrioritizedSenders};
106pub use blob::{blob_tx_priority, fee_delta};
107pub use events::{FullTransactionEvent, TransactionEvent};
108pub use listener::{AllTransactionsEvents, TransactionEvents};
109pub use parked::{BasefeeOrd, ParkedOrd, ParkedPool, QueuedOrd};
110pub use pending::PendingPool;
111use reth_primitives_traits::Block;
112
113mod best;
114mod blob;
115mod listener;
116mod parked;
117pub(crate) mod pending;
118pub(crate) mod size;
119pub(crate) mod state;
120pub mod txpool;
121mod update;
122
123/// Bound on number of pending transactions from `reth_network::TransactionsManager` to buffer.
124pub const PENDING_TX_LISTENER_BUFFER_SIZE: usize = 2048;
125/// Bound on number of new transactions from `reth_network::TransactionsManager` to buffer.
126pub const NEW_TX_LISTENER_BUFFER_SIZE: usize = 1024;
127
128const BLOB_SIDECAR_LISTENER_BUFFER_SIZE: usize = 512;
129
130/// Transaction pool internals.
131pub struct PoolInner<V, T, S>
132where
133    T: TransactionOrdering,
134{
135    /// Internal mapping of addresses to plain ints.
136    identifiers: RwLock<SenderIdentifiers>,
137    /// Transaction validation.
138    validator: V,
139    /// Storage for blob transactions
140    blob_store: S,
141    /// The internal pool that manages all transactions.
142    pool: RwLock<TxPool<T>>,
143    /// Pool settings.
144    config: PoolConfig,
145    /// Manages listeners for transaction state change events.
146    event_listener: RwLock<PoolEventBroadcast<T::Transaction>>,
147    /// Listeners for new _full_ pending transactions.
148    pending_transaction_listener: Mutex<Vec<PendingTransactionHashListener>>,
149    /// Listeners for new transactions added to the pool.
150    transaction_listener: Mutex<Vec<TransactionListener<T::Transaction>>>,
151    /// Listener for new blob transaction sidecars added to the pool.
152    blob_transaction_sidecar_listener: Mutex<Vec<BlobTransactionSidecarListener>>,
153    /// Metrics for the blob store
154    blob_store_metrics: BlobStoreMetrics,
155}
156
157// === impl PoolInner ===
158
159impl<V, T, S> PoolInner<V, T, S>
160where
161    V: TransactionValidator,
162    T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
163    S: BlobStore,
164{
165    /// Create a new transaction pool instance.
166    pub fn new(validator: V, ordering: T, blob_store: S, config: PoolConfig) -> Self {
167        Self {
168            identifiers: Default::default(),
169            validator,
170            event_listener: Default::default(),
171            pool: RwLock::new(TxPool::new(ordering, config.clone())),
172            pending_transaction_listener: Default::default(),
173            transaction_listener: Default::default(),
174            blob_transaction_sidecar_listener: Default::default(),
175            config,
176            blob_store,
177            blob_store_metrics: Default::default(),
178        }
179    }
180
181    /// Returns the configured blob store.
182    pub const fn blob_store(&self) -> &S {
183        &self.blob_store
184    }
185
186    /// Returns stats about the size of the pool.
187    pub fn size(&self) -> PoolSize {
188        self.get_pool_data().size()
189    }
190
191    /// Returns the currently tracked block
192    pub fn block_info(&self) -> BlockInfo {
193        self.get_pool_data().block_info()
194    }
195    /// Sets the currently tracked block
196    pub fn set_block_info(&self, info: BlockInfo) {
197        self.pool.write().set_block_info(info)
198    }
199
200    /// Returns the internal [`SenderId`] for this address
201    pub fn get_sender_id(&self, addr: Address) -> SenderId {
202        self.identifiers.write().sender_id_or_create(addr)
203    }
204
205    /// Returns all senders in the pool
206    pub fn unique_senders(&self) -> HashSet<Address> {
207        self.get_pool_data().unique_senders()
208    }
209
210    /// Converts the changed accounts to a map of sender ids to sender info (internal identifier
211    /// used for accounts)
212    fn changed_senders(
213        &self,
214        accs: impl Iterator<Item = ChangedAccount>,
215    ) -> FxHashMap<SenderId, SenderInfo> {
216        let mut identifiers = self.identifiers.write();
217        accs.into_iter()
218            .map(|acc| {
219                let ChangedAccount { address, nonce, balance } = acc;
220                let sender_id = identifiers.sender_id_or_create(address);
221                (sender_id, SenderInfo { state_nonce: nonce, balance })
222            })
223            .collect()
224    }
225
226    /// Get the config the pool was configured with.
227    pub const fn config(&self) -> &PoolConfig {
228        &self.config
229    }
230
231    /// Get the validator reference.
232    pub const fn validator(&self) -> &V {
233        &self.validator
234    }
235
236    /// Adds a new transaction listener to the pool that gets notified about every new _pending_
237    /// transaction inserted into the pool
238    pub fn add_pending_listener(&self, kind: TransactionListenerKind) -> mpsc::Receiver<TxHash> {
239        let (sender, rx) = mpsc::channel(self.config.pending_tx_listener_buffer_size);
240        let listener = PendingTransactionHashListener { sender, kind };
241        self.pending_transaction_listener.lock().push(listener);
242        rx
243    }
244
245    /// Adds a new transaction listener to the pool that gets notified about every new transaction.
246    pub fn add_new_transaction_listener(
247        &self,
248        kind: TransactionListenerKind,
249    ) -> mpsc::Receiver<NewTransactionEvent<T::Transaction>> {
250        let (sender, rx) = mpsc::channel(self.config.new_tx_listener_buffer_size);
251        let listener = TransactionListener { sender, kind };
252        self.transaction_listener.lock().push(listener);
253        rx
254    }
255    /// Adds a new blob sidecar listener to the pool that gets notified about every new
256    /// eip4844 transaction's blob sidecar.
257    pub fn add_blob_sidecar_listener(&self) -> mpsc::Receiver<NewBlobSidecar> {
258        let (sender, rx) = mpsc::channel(BLOB_SIDECAR_LISTENER_BUFFER_SIZE);
259        let listener = BlobTransactionSidecarListener { sender };
260        self.blob_transaction_sidecar_listener.lock().push(listener);
261        rx
262    }
263
264    /// If the pool contains the transaction, this adds a new listener that gets notified about
265    /// transaction events.
266    pub fn add_transaction_event_listener(&self, tx_hash: TxHash) -> Option<TransactionEvents> {
267        self.get_pool_data()
268            .contains(&tx_hash)
269            .then(|| self.event_listener.write().subscribe(tx_hash))
270    }
271
272    /// Adds a listener for all transaction events.
273    pub fn add_all_transactions_event_listener(&self) -> AllTransactionsEvents<T::Transaction> {
274        self.event_listener.write().subscribe_all()
275    }
276
277    /// Returns a read lock to the pool's data.
278    pub fn get_pool_data(&self) -> RwLockReadGuard<'_, TxPool<T>> {
279        self.pool.read()
280    }
281
282    /// Returns hashes of _all_ transactions in the pool.
283    pub fn pooled_transactions_hashes(&self) -> Vec<TxHash> {
284        self.get_pool_data()
285            .all()
286            .transactions_iter()
287            .filter(|tx| tx.propagate)
288            .map(|tx| *tx.hash())
289            .collect()
290    }
291
292    /// Returns _all_ transactions in the pool.
293    pub fn pooled_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
294        self.get_pool_data().all().transactions_iter().filter(|tx| tx.propagate).cloned().collect()
295    }
296
297    /// Returns only the first `max` transactions in the pool.
298    pub fn pooled_transactions_max(
299        &self,
300        max: usize,
301    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
302        self.get_pool_data()
303            .all()
304            .transactions_iter()
305            .filter(|tx| tx.propagate)
306            .take(max)
307            .cloned()
308            .collect()
309    }
310
311    /// Converts the internally tracked transaction to the pooled format.
312    ///
313    /// If the transaction is an EIP-4844 transaction, the blob sidecar is fetched from the blob
314    /// store and attached to the transaction.
315    fn to_pooled_transaction(
316        &self,
317        transaction: Arc<ValidPoolTransaction<T::Transaction>>,
318    ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
319    where
320        <V as TransactionValidator>::Transaction: EthPoolTransaction,
321    {
322        if transaction.is_eip4844() {
323            let sidecar = self.blob_store.get(*transaction.hash()).ok()??;
324            transaction.transaction.clone().try_into_pooled_eip4844(sidecar)
325        } else {
326            transaction
327                .transaction
328                .clone()
329                .try_into_pooled()
330                .inspect_err(|err| {
331                    debug!(
332                        target: "txpool", %err,
333                        "failed to convert transaction to pooled element; skipping",
334                    );
335                })
336                .ok()
337        }
338    }
339
340    /// Returns pooled transactions for the given transaction hashes.
341    pub fn get_pooled_transaction_elements(
342        &self,
343        tx_hashes: Vec<TxHash>,
344        limit: GetPooledTransactionLimit,
345    ) -> Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>
346    where
347        <V as TransactionValidator>::Transaction: EthPoolTransaction,
348    {
349        let transactions = self.get_all(tx_hashes);
350        let mut elements = Vec::with_capacity(transactions.len());
351        let mut size = 0;
352        for transaction in transactions {
353            let encoded_len = transaction.encoded_length();
354            let Some(pooled) = self.to_pooled_transaction(transaction) else {
355                continue;
356            };
357
358            size += encoded_len;
359            elements.push(pooled.into_inner());
360
361            if limit.exceeds(size) {
362                break
363            }
364        }
365
366        elements
367    }
368
369    /// Returns converted pooled transaction for the given transaction hash.
370    pub fn get_pooled_transaction_element(
371        &self,
372        tx_hash: TxHash,
373    ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
374    where
375        <V as TransactionValidator>::Transaction: EthPoolTransaction,
376    {
377        self.get(&tx_hash).and_then(|tx| self.to_pooled_transaction(tx))
378    }
379
380    /// Updates the entire pool after a new block was executed.
381    pub fn on_canonical_state_change<B>(&self, update: CanonicalStateUpdate<'_, B>)
382    where
383        B: Block,
384    {
385        trace!(target: "txpool", ?update, "updating pool on canonical state change");
386
387        let block_info = update.block_info();
388        let CanonicalStateUpdate {
389            new_tip, changed_accounts, mined_transactions, update_kind, ..
390        } = update;
391        self.validator.on_new_head_block(new_tip);
392
393        let changed_senders = self.changed_senders(changed_accounts.into_iter());
394
395        // update the pool
396        let outcome = self.pool.write().on_canonical_state_change(
397            block_info,
398            mined_transactions,
399            changed_senders,
400            update_kind,
401        );
402
403        // This will discard outdated transactions based on the account's nonce
404        self.delete_discarded_blobs(outcome.discarded.iter());
405
406        // notify listeners about updates
407        self.notify_on_new_state(outcome);
408    }
409
410    /// Performs account updates on the pool.
411    ///
412    /// This will either promote or discard transactions based on the new account state.
413    pub fn update_accounts(&self, accounts: Vec<ChangedAccount>) {
414        let changed_senders = self.changed_senders(accounts.into_iter());
415        let UpdateOutcome { promoted, discarded } =
416            self.pool.write().update_accounts(changed_senders);
417        let mut listener = self.event_listener.write();
418
419        promoted.iter().for_each(|tx| listener.pending(tx.hash(), None));
420        discarded.iter().for_each(|tx| listener.discarded(tx.hash()));
421
422        // This deletes outdated blob txs from the blob store, based on the account's nonce. This is
423        // called during txpool maintenance when the pool drifted.
424        self.delete_discarded_blobs(discarded.iter());
425    }
426
427    /// Add a single validated transaction into the pool.
428    ///
429    /// Note: this is only used internally by [`Self::add_transactions()`], all new transaction(s)
430    /// come in through that function, either as a batch or `std::iter::once`.
431    fn add_transaction(
432        &self,
433        pool: &mut RwLockWriteGuard<'_, TxPool<T>>,
434        origin: TransactionOrigin,
435        tx: TransactionValidationOutcome<T::Transaction>,
436    ) -> PoolResult<TxHash> {
437        match tx {
438            TransactionValidationOutcome::Valid {
439                balance,
440                state_nonce,
441                transaction,
442                propagate,
443            } => {
444                let sender_id = self.get_sender_id(transaction.sender());
445                let transaction_id = TransactionId::new(sender_id, transaction.nonce());
446
447                // split the valid transaction and the blob sidecar if it has any
448                let (transaction, maybe_sidecar) = match transaction {
449                    ValidTransaction::Valid(tx) => (tx, None),
450                    ValidTransaction::ValidWithSidecar { transaction, sidecar } => {
451                        debug_assert!(
452                            transaction.is_eip4844(),
453                            "validator returned sidecar for non EIP-4844 transaction"
454                        );
455                        (transaction, Some(sidecar))
456                    }
457                };
458
459                let tx = ValidPoolTransaction {
460                    transaction,
461                    transaction_id,
462                    propagate,
463                    timestamp: Instant::now(),
464                    origin,
465                };
466
467                let added = pool.add_transaction(tx, balance, state_nonce)?;
468                let hash = *added.hash();
469
470                // transaction was successfully inserted into the pool
471                if let Some(sidecar) = maybe_sidecar {
472                    // notify blob sidecar listeners
473                    self.on_new_blob_sidecar(&hash, &sidecar);
474                    // store the sidecar in the blob store
475                    self.insert_blob(hash, sidecar);
476                }
477
478                if let Some(replaced) = added.replaced_blob_transaction() {
479                    debug!(target: "txpool", "[{:?}] delete replaced blob sidecar", replaced);
480                    // delete the replaced transaction from the blob store
481                    self.delete_blob(replaced);
482                }
483
484                // Notify about new pending transactions
485                if let Some(pending) = added.as_pending() {
486                    self.on_new_pending_transaction(pending);
487                }
488
489                // Notify tx event listeners
490                self.notify_event_listeners(&added);
491
492                if let Some(discarded) = added.discarded_transactions() {
493                    self.delete_discarded_blobs(discarded.iter());
494                }
495
496                // Notify listeners for _all_ transactions
497                self.on_new_transaction(added.into_new_transaction_event());
498
499                Ok(hash)
500            }
501            TransactionValidationOutcome::Invalid(tx, err) => {
502                let mut listener = self.event_listener.write();
503                listener.discarded(tx.hash());
504                Err(PoolError::new(*tx.hash(), err))
505            }
506            TransactionValidationOutcome::Error(tx_hash, err) => {
507                let mut listener = self.event_listener.write();
508                listener.discarded(&tx_hash);
509                Err(PoolError::other(tx_hash, err))
510            }
511        }
512    }
513
514    /// Adds a transaction and returns the event stream.
515    pub fn add_transaction_and_subscribe(
516        &self,
517        origin: TransactionOrigin,
518        tx: TransactionValidationOutcome<T::Transaction>,
519    ) -> PoolResult<TransactionEvents> {
520        let listener = {
521            let mut listener = self.event_listener.write();
522            listener.subscribe(tx.tx_hash())
523        };
524        let mut results = self.add_transactions(origin, std::iter::once(tx));
525        results.pop().expect("result length is the same as the input")?;
526        Ok(listener)
527    }
528
529    /// Adds all transactions in the iterator to the pool, returning a list of results.
530    ///
531    /// Note: A large batch may lock the pool for a long time that blocks important operations
532    /// like updating the pool on canonical state changes. The caller should consider having
533    /// a max batch size to balance transaction insertions with other updates.
534    pub fn add_transactions(
535        &self,
536        origin: TransactionOrigin,
537        transactions: impl IntoIterator<Item = TransactionValidationOutcome<T::Transaction>>,
538    ) -> Vec<PoolResult<TxHash>> {
539        // Add the transactions and enforce the pool size limits in one write lock
540        let (mut added, discarded) = {
541            let mut pool = self.pool.write();
542            let added = transactions
543                .into_iter()
544                .map(|tx| self.add_transaction(&mut pool, origin, tx))
545                .collect::<Vec<_>>();
546
547            // Enforce the pool size limits if at least one transaction was added successfully
548            let discarded = if added.iter().any(Result::is_ok) {
549                pool.discard_worst()
550            } else {
551                Default::default()
552            };
553
554            (added, discarded)
555        };
556
557        if !discarded.is_empty() {
558            // Delete any blobs associated with discarded blob transactions
559            self.delete_discarded_blobs(discarded.iter());
560
561            let discarded_hashes =
562                discarded.into_iter().map(|tx| *tx.hash()).collect::<HashSet<_>>();
563
564            {
565                let mut listener = self.event_listener.write();
566                discarded_hashes.iter().for_each(|hash| listener.discarded(hash));
567            }
568
569            // A newly added transaction may be immediately discarded, so we need to
570            // adjust the result here
571            for res in &mut added {
572                if let Ok(hash) = res {
573                    if discarded_hashes.contains(hash) {
574                        *res = Err(PoolError::new(*hash, PoolErrorKind::DiscardedOnInsert))
575                    }
576                }
577            }
578        }
579
580        added
581    }
582
583    /// Notify all listeners about a new pending transaction.
584    fn on_new_pending_transaction(&self, pending: &AddedPendingTransaction<T::Transaction>) {
585        let propagate_allowed = pending.is_propagate_allowed();
586
587        let mut transaction_listeners = self.pending_transaction_listener.lock();
588        transaction_listeners.retain_mut(|listener| {
589            if listener.kind.is_propagate_only() && !propagate_allowed {
590                // only emit this hash to listeners that are only allowed to receive propagate only
591                // transactions, such as network
592                return !listener.sender.is_closed()
593            }
594
595            // broadcast all pending transactions to the listener
596            listener.send_all(pending.pending_transactions(listener.kind))
597        });
598    }
599
600    /// Notify all listeners about a newly inserted pending transaction.
601    fn on_new_transaction(&self, event: NewTransactionEvent<T::Transaction>) {
602        let mut transaction_listeners = self.transaction_listener.lock();
603        transaction_listeners.retain_mut(|listener| {
604            if listener.kind.is_propagate_only() && !event.transaction.propagate {
605                // only emit this hash to listeners that are only allowed to receive propagate only
606                // transactions, such as network
607                return !listener.sender.is_closed()
608            }
609
610            listener.send(event.clone())
611        });
612    }
613
614    /// Notify all listeners about a blob sidecar for a newly inserted blob (eip4844) transaction.
615    fn on_new_blob_sidecar(&self, tx_hash: &TxHash, sidecar: &BlobTransactionSidecar) {
616        let mut sidecar_listeners = self.blob_transaction_sidecar_listener.lock();
617        if sidecar_listeners.is_empty() {
618            return
619        }
620        let sidecar = Arc::new(sidecar.clone());
621        sidecar_listeners.retain_mut(|listener| {
622            let new_blob_event = NewBlobSidecar { tx_hash: *tx_hash, sidecar: sidecar.clone() };
623            match listener.sender.try_send(new_blob_event) {
624                Ok(()) => true,
625                Err(err) => {
626                    if matches!(err, mpsc::error::TrySendError::Full(_)) {
627                        debug!(
628                            target: "txpool",
629                            "[{:?}] failed to send blob sidecar; channel full",
630                            sidecar,
631                        );
632                        true
633                    } else {
634                        false
635                    }
636                }
637            }
638        })
639    }
640
641    /// Notifies transaction listeners about changes once a block was processed.
642    fn notify_on_new_state(&self, outcome: OnNewCanonicalStateOutcome<T::Transaction>) {
643        trace!(target: "txpool", promoted=outcome.promoted.len(), discarded= outcome.discarded.len() ,"notifying listeners on state change");
644
645        // notify about promoted pending transactions
646        // emit hashes
647        self.pending_transaction_listener
648            .lock()
649            .retain_mut(|listener| listener.send_all(outcome.pending_transactions(listener.kind)));
650
651        // emit full transactions
652        self.transaction_listener.lock().retain_mut(|listener| {
653            listener.send_all(outcome.full_pending_transactions(listener.kind))
654        });
655
656        let OnNewCanonicalStateOutcome { mined, promoted, discarded, block_hash } = outcome;
657
658        // broadcast specific transaction events
659        let mut listener = self.event_listener.write();
660
661        mined.iter().for_each(|tx| listener.mined(tx, block_hash));
662        promoted.iter().for_each(|tx| listener.pending(tx.hash(), None));
663        discarded.iter().for_each(|tx| listener.discarded(tx.hash()));
664    }
665
666    /// Fire events for the newly added transaction if there are any.
667    fn notify_event_listeners(&self, tx: &AddedTransaction<T::Transaction>) {
668        let mut listener = self.event_listener.write();
669
670        match tx {
671            AddedTransaction::Pending(tx) => {
672                let AddedPendingTransaction { transaction, promoted, discarded, replaced } = tx;
673
674                listener.pending(transaction.hash(), replaced.clone());
675                promoted.iter().for_each(|tx| listener.pending(tx.hash(), None));
676                discarded.iter().for_each(|tx| listener.discarded(tx.hash()));
677            }
678            AddedTransaction::Parked { transaction, replaced, .. } => {
679                listener.queued(transaction.hash());
680                if let Some(replaced) = replaced {
681                    listener.replaced(replaced.clone(), *transaction.hash());
682                }
683            }
684        }
685    }
686
687    /// Returns an iterator that yields transactions that are ready to be included in the block.
688    pub fn best_transactions(&self) -> BestTransactions<T> {
689        self.get_pool_data().best_transactions()
690    }
691
692    /// Returns an iterator that yields transactions that are ready to be included in the block with
693    /// the given base fee and optional blob fee attributes.
694    pub fn best_transactions_with_attributes(
695        &self,
696        best_transactions_attributes: BestTransactionsAttributes,
697    ) -> Box<dyn crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T::Transaction>>>>
698    {
699        self.get_pool_data().best_transactions_with_attributes(best_transactions_attributes)
700    }
701
702    /// Returns only the first `max` transactions in the pending pool.
703    pub fn pending_transactions_max(
704        &self,
705        max: usize,
706    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
707        self.get_pool_data().pending_transactions_iter().take(max).collect()
708    }
709
710    /// Returns all transactions from the pending sub-pool
711    pub fn pending_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
712        self.get_pool_data().pending_transactions()
713    }
714
715    /// Returns all transactions from parked pools
716    pub fn queued_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
717        self.get_pool_data().queued_transactions()
718    }
719
720    /// Returns all transactions in the pool
721    pub fn all_transactions(&self) -> AllPoolTransactions<T::Transaction> {
722        let pool = self.get_pool_data();
723        AllPoolTransactions {
724            pending: pool.pending_transactions(),
725            queued: pool.queued_transactions(),
726        }
727    }
728
729    /// Removes and returns all matching transactions from the pool.
730    pub fn remove_transactions(
731        &self,
732        hashes: Vec<TxHash>,
733    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
734        if hashes.is_empty() {
735            return Vec::new()
736        }
737        let removed = self.pool.write().remove_transactions(hashes);
738
739        let mut listener = self.event_listener.write();
740
741        removed.iter().for_each(|tx| listener.discarded(tx.hash()));
742
743        removed
744    }
745
746    /// Removes and returns all matching transactions and their dependent transactions from the
747    /// pool.
748    pub fn remove_transactions_and_descendants(
749        &self,
750        hashes: Vec<TxHash>,
751    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
752        if hashes.is_empty() {
753            return Vec::new()
754        }
755        let removed = self.pool.write().remove_transactions_and_descendants(hashes);
756
757        let mut listener = self.event_listener.write();
758
759        removed.iter().for_each(|tx| listener.discarded(tx.hash()));
760
761        removed
762    }
763
764    /// Removes and returns all transactions by the specified sender from the pool.
765    pub fn remove_transactions_by_sender(
766        &self,
767        sender: Address,
768    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
769        let sender_id = self.get_sender_id(sender);
770        let removed = self.pool.write().remove_transactions_by_sender(sender_id);
771
772        let mut listener = self.event_listener.write();
773
774        removed.iter().for_each(|tx| listener.discarded(tx.hash()));
775
776        removed
777    }
778
779    /// Removes and returns all transactions that are present in the pool.
780    pub fn retain_unknown<A>(&self, announcement: &mut A)
781    where
782        A: HandleMempoolData,
783    {
784        if announcement.is_empty() {
785            return
786        }
787        let pool = self.get_pool_data();
788        announcement.retain_by_hash(|tx| !pool.contains(tx))
789    }
790
791    /// Returns the transaction by hash.
792    pub fn get(&self, tx_hash: &TxHash) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
793        self.get_pool_data().get(tx_hash)
794    }
795
796    /// Returns all transactions of the address
797    pub fn get_transactions_by_sender(
798        &self,
799        sender: Address,
800    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
801        let sender_id = self.get_sender_id(sender);
802        self.get_pool_data().get_transactions_by_sender(sender_id)
803    }
804
805    /// Returns all queued transactions of the address by sender
806    pub fn get_queued_transactions_by_sender(
807        &self,
808        sender: Address,
809    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
810        let sender_id = self.get_sender_id(sender);
811        self.get_pool_data().queued_txs_by_sender(sender_id)
812    }
813
814    /// Returns all pending transactions filtered by predicate
815    pub fn pending_transactions_with_predicate(
816        &self,
817        predicate: impl FnMut(&ValidPoolTransaction<T::Transaction>) -> bool,
818    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
819        self.get_pool_data().pending_transactions_with_predicate(predicate)
820    }
821
822    /// Returns all pending transactions of the address by sender
823    pub fn get_pending_transactions_by_sender(
824        &self,
825        sender: Address,
826    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
827        let sender_id = self.get_sender_id(sender);
828        self.get_pool_data().pending_txs_by_sender(sender_id)
829    }
830
831    /// Returns the highest transaction of the address
832    pub fn get_highest_transaction_by_sender(
833        &self,
834        sender: Address,
835    ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
836        let sender_id = self.get_sender_id(sender);
837        self.get_pool_data().get_highest_transaction_by_sender(sender_id)
838    }
839
840    /// Returns the transaction with the highest nonce that is executable given the on chain nonce.
841    pub fn get_highest_consecutive_transaction_by_sender(
842        &self,
843        sender: Address,
844        on_chain_nonce: u64,
845    ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
846        let sender_id = self.get_sender_id(sender);
847        self.get_pool_data().get_highest_consecutive_transaction_by_sender(
848            sender_id.into_transaction_id(on_chain_nonce),
849        )
850    }
851
852    /// Returns the transaction given a [`TransactionId`]
853    pub fn get_transaction_by_transaction_id(
854        &self,
855        transaction_id: &TransactionId,
856    ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
857        self.get_pool_data().all().get(transaction_id).map(|tx| tx.transaction.clone())
858    }
859
860    /// Returns all transactions that where submitted with the given [`TransactionOrigin`]
861    pub fn get_transactions_by_origin(
862        &self,
863        origin: TransactionOrigin,
864    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
865        self.get_pool_data()
866            .all()
867            .transactions_iter()
868            .filter(|tx| tx.origin == origin)
869            .cloned()
870            .collect()
871    }
872
873    /// Returns all pending transactions filted by [`TransactionOrigin`]
874    pub fn get_pending_transactions_by_origin(
875        &self,
876        origin: TransactionOrigin,
877    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
878        self.get_pool_data().pending_transactions_iter().filter(|tx| tx.origin == origin).collect()
879    }
880
881    /// Returns all the transactions belonging to the hashes.
882    ///
883    /// If no transaction exists, it is skipped.
884    pub fn get_all(&self, txs: Vec<TxHash>) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
885        if txs.is_empty() {
886            return Vec::new()
887        }
888        self.get_pool_data().get_all(txs).collect()
889    }
890
891    /// Notify about propagated transactions.
892    pub fn on_propagated(&self, txs: PropagatedTransactions) {
893        if txs.0.is_empty() {
894            return
895        }
896        let mut listener = self.event_listener.write();
897
898        txs.0.into_iter().for_each(|(hash, peers)| listener.propagated(&hash, peers))
899    }
900
901    /// Number of transactions in the entire pool
902    pub fn len(&self) -> usize {
903        self.get_pool_data().len()
904    }
905
906    /// Whether the pool is empty
907    pub fn is_empty(&self) -> bool {
908        self.get_pool_data().is_empty()
909    }
910
911    /// Returns whether or not the pool is over its configured size and transaction count limits.
912    pub fn is_exceeded(&self) -> bool {
913        self.pool.read().is_exceeded()
914    }
915
916    /// Inserts a blob transaction into the blob store
917    fn insert_blob(&self, hash: TxHash, blob: BlobTransactionSidecar) {
918        debug!(target: "txpool", "[{:?}] storing blob sidecar", hash);
919        if let Err(err) = self.blob_store.insert(hash, blob) {
920            warn!(target: "txpool", %err, "[{:?}] failed to insert blob", hash);
921            self.blob_store_metrics.blobstore_failed_inserts.increment(1);
922        }
923        self.update_blob_store_metrics();
924    }
925
926    /// Delete a blob from the blob store
927    pub fn delete_blob(&self, blob: TxHash) {
928        let _ = self.blob_store.delete(blob);
929    }
930
931    /// Delete all blobs from the blob store
932    pub fn delete_blobs(&self, txs: Vec<TxHash>) {
933        let _ = self.blob_store.delete_all(txs);
934    }
935
936    /// Cleans up the blob store
937    pub fn cleanup_blobs(&self) {
938        let stat = self.blob_store.cleanup();
939        self.blob_store_metrics.blobstore_failed_deletes.increment(stat.delete_failed as u64);
940        self.update_blob_store_metrics();
941    }
942
943    fn update_blob_store_metrics(&self) {
944        if let Some(data_size) = self.blob_store.data_size_hint() {
945            self.blob_store_metrics.blobstore_byte_size.set(data_size as f64);
946        }
947        self.blob_store_metrics.blobstore_entries.set(self.blob_store.blobs_len() as f64);
948    }
949
950    /// Deletes all blob transactions that were discarded.
951    fn delete_discarded_blobs<'a>(
952        &'a self,
953        transactions: impl IntoIterator<Item = &'a Arc<ValidPoolTransaction<T::Transaction>>>,
954    ) {
955        let blob_txs = transactions
956            .into_iter()
957            .filter(|tx| tx.transaction.is_eip4844())
958            .map(|tx| *tx.hash())
959            .collect();
960        self.delete_blobs(blob_txs);
961    }
962}
963
964impl<V, T: TransactionOrdering, S> fmt::Debug for PoolInner<V, T, S> {
965    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
966        f.debug_struct("PoolInner").field("config", &self.config).finish_non_exhaustive()
967    }
968}
969
970/// An active listener for new pending transactions.
971#[derive(Debug)]
972struct PendingTransactionHashListener {
973    sender: mpsc::Sender<TxHash>,
974    /// Whether to include transactions that should not be propagated over the network.
975    kind: TransactionListenerKind,
976}
977
978impl PendingTransactionHashListener {
979    /// Attempts to send all hashes to the listener.
980    ///
981    /// Returns false if the channel is closed (receiver dropped)
982    fn send_all(&self, hashes: impl IntoIterator<Item = TxHash>) -> bool {
983        for tx_hash in hashes {
984            match self.sender.try_send(tx_hash) {
985                Ok(()) => {}
986                Err(err) => {
987                    return if matches!(err, mpsc::error::TrySendError::Full(_)) {
988                        debug!(
989                            target: "txpool",
990                            "[{:?}] failed to send pending tx; channel full",
991                            tx_hash,
992                        );
993                        true
994                    } else {
995                        false
996                    }
997                }
998            }
999        }
1000        true
1001    }
1002}
1003
1004/// An active listener for new pending transactions.
1005#[derive(Debug)]
1006struct TransactionListener<T: PoolTransaction> {
1007    sender: mpsc::Sender<NewTransactionEvent<T>>,
1008    /// Whether to include transactions that should not be propagated over the network.
1009    kind: TransactionListenerKind,
1010}
1011
1012impl<T: PoolTransaction> TransactionListener<T> {
1013    /// Attempts to send the event to the listener.
1014    ///
1015    /// Returns false if the channel is closed (receiver dropped)
1016    fn send(&self, event: NewTransactionEvent<T>) -> bool {
1017        self.send_all(std::iter::once(event))
1018    }
1019
1020    /// Attempts to send all events to the listener.
1021    ///
1022    /// Returns false if the channel is closed (receiver dropped)
1023    fn send_all(&self, events: impl IntoIterator<Item = NewTransactionEvent<T>>) -> bool {
1024        for event in events {
1025            match self.sender.try_send(event) {
1026                Ok(()) => {}
1027                Err(err) => {
1028                    return if let mpsc::error::TrySendError::Full(event) = err {
1029                        debug!(
1030                            target: "txpool",
1031                            "[{:?}] failed to send pending tx; channel full",
1032                            event.transaction.hash(),
1033                        );
1034                        true
1035                    } else {
1036                        false
1037                    }
1038                }
1039            }
1040        }
1041        true
1042    }
1043}
1044
1045/// An active listener for new blobs
1046#[derive(Debug)]
1047struct BlobTransactionSidecarListener {
1048    sender: mpsc::Sender<NewBlobSidecar>,
1049}
1050
1051/// Tracks an added transaction and all graph changes caused by adding it.
1052#[derive(Debug, Clone)]
1053pub struct AddedPendingTransaction<T: PoolTransaction> {
1054    /// Inserted transaction.
1055    transaction: Arc<ValidPoolTransaction<T>>,
1056    /// Replaced transaction.
1057    replaced: Option<Arc<ValidPoolTransaction<T>>>,
1058    /// transactions promoted to the pending queue
1059    promoted: Vec<Arc<ValidPoolTransaction<T>>>,
1060    /// transactions that failed and became discarded
1061    discarded: Vec<Arc<ValidPoolTransaction<T>>>,
1062}
1063
1064impl<T: PoolTransaction> AddedPendingTransaction<T> {
1065    /// Returns all transactions that were promoted to the pending pool and adhere to the given
1066    /// [`TransactionListenerKind`].
1067    ///
1068    /// If the kind is [`TransactionListenerKind::PropagateOnly`], then only transactions that
1069    /// are allowed to be propagated are returned.
1070    pub(crate) fn pending_transactions(
1071        &self,
1072        kind: TransactionListenerKind,
1073    ) -> impl Iterator<Item = B256> + '_ {
1074        let iter = std::iter::once(&self.transaction).chain(self.promoted.iter());
1075        PendingTransactionIter { kind, iter }
1076    }
1077
1078    /// Returns if the transaction should be propagated.
1079    pub(crate) fn is_propagate_allowed(&self) -> bool {
1080        self.transaction.propagate
1081    }
1082}
1083
1084pub(crate) struct PendingTransactionIter<Iter> {
1085    kind: TransactionListenerKind,
1086    iter: Iter,
1087}
1088
1089impl<'a, Iter, T> Iterator for PendingTransactionIter<Iter>
1090where
1091    Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1092    T: PoolTransaction + 'a,
1093{
1094    type Item = B256;
1095
1096    fn next(&mut self) -> Option<Self::Item> {
1097        loop {
1098            let next = self.iter.next()?;
1099            if self.kind.is_propagate_only() && !next.propagate {
1100                continue
1101            }
1102            return Some(*next.hash())
1103        }
1104    }
1105}
1106
1107/// An iterator over full pending transactions
1108pub(crate) struct FullPendingTransactionIter<Iter> {
1109    kind: TransactionListenerKind,
1110    iter: Iter,
1111}
1112
1113impl<'a, Iter, T> Iterator for FullPendingTransactionIter<Iter>
1114where
1115    Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1116    T: PoolTransaction + 'a,
1117{
1118    type Item = NewTransactionEvent<T>;
1119
1120    fn next(&mut self) -> Option<Self::Item> {
1121        loop {
1122            let next = self.iter.next()?;
1123            if self.kind.is_propagate_only() && !next.propagate {
1124                continue
1125            }
1126            return Some(NewTransactionEvent {
1127                subpool: SubPool::Pending,
1128                transaction: next.clone(),
1129            })
1130        }
1131    }
1132}
1133
1134/// Represents a transaction that was added into the pool and its state
1135#[derive(Debug, Clone)]
1136pub enum AddedTransaction<T: PoolTransaction> {
1137    /// Transaction was successfully added and moved to the pending pool.
1138    Pending(AddedPendingTransaction<T>),
1139    /// Transaction was successfully added but not yet ready for processing and moved to a
1140    /// parked pool instead.
1141    Parked {
1142        /// Inserted transaction.
1143        transaction: Arc<ValidPoolTransaction<T>>,
1144        /// Replaced transaction.
1145        replaced: Option<Arc<ValidPoolTransaction<T>>>,
1146        /// The subpool it was moved to.
1147        subpool: SubPool,
1148    },
1149}
1150
1151impl<T: PoolTransaction> AddedTransaction<T> {
1152    /// Returns whether the transaction has been added to the pending pool.
1153    pub(crate) const fn as_pending(&self) -> Option<&AddedPendingTransaction<T>> {
1154        match self {
1155            Self::Pending(tx) => Some(tx),
1156            _ => None,
1157        }
1158    }
1159
1160    /// Returns the replaced transaction if there was one
1161    pub(crate) const fn replaced(&self) -> Option<&Arc<ValidPoolTransaction<T>>> {
1162        match self {
1163            Self::Pending(tx) => tx.replaced.as_ref(),
1164            Self::Parked { replaced, .. } => replaced.as_ref(),
1165        }
1166    }
1167
1168    /// Returns the discarded transactions if there were any
1169    pub(crate) fn discarded_transactions(&self) -> Option<&[Arc<ValidPoolTransaction<T>>]> {
1170        match self {
1171            Self::Pending(tx) => Some(&tx.discarded),
1172            Self::Parked { .. } => None,
1173        }
1174    }
1175
1176    /// Returns the hash of the replaced transaction if it is a blob transaction.
1177    pub(crate) fn replaced_blob_transaction(&self) -> Option<B256> {
1178        self.replaced().filter(|tx| tx.transaction.is_eip4844()).map(|tx| *tx.transaction.hash())
1179    }
1180
1181    /// Returns the hash of the transaction
1182    pub(crate) fn hash(&self) -> &TxHash {
1183        match self {
1184            Self::Pending(tx) => tx.transaction.hash(),
1185            Self::Parked { transaction, .. } => transaction.hash(),
1186        }
1187    }
1188
1189    /// Converts this type into the event type for listeners
1190    pub(crate) fn into_new_transaction_event(self) -> NewTransactionEvent<T> {
1191        match self {
1192            Self::Pending(tx) => {
1193                NewTransactionEvent { subpool: SubPool::Pending, transaction: tx.transaction }
1194            }
1195            Self::Parked { transaction, subpool, .. } => {
1196                NewTransactionEvent { transaction, subpool }
1197            }
1198        }
1199    }
1200
1201    /// Returns the subpool this transaction was added to
1202    #[cfg(test)]
1203    pub(crate) const fn subpool(&self) -> SubPool {
1204        match self {
1205            Self::Pending(_) => SubPool::Pending,
1206            Self::Parked { subpool, .. } => *subpool,
1207        }
1208    }
1209
1210    /// Returns the [`TransactionId`] of the added transaction
1211    #[cfg(test)]
1212    pub(crate) fn id(&self) -> &TransactionId {
1213        match self {
1214            Self::Pending(added) => added.transaction.id(),
1215            Self::Parked { transaction, .. } => transaction.id(),
1216        }
1217    }
1218}
1219
1220/// Contains all state changes after a [`CanonicalStateUpdate`] was processed
1221#[derive(Debug)]
1222pub(crate) struct OnNewCanonicalStateOutcome<T: PoolTransaction> {
1223    /// Hash of the block.
1224    pub(crate) block_hash: B256,
1225    /// All mined transactions.
1226    pub(crate) mined: Vec<TxHash>,
1227    /// Transactions promoted to the pending pool.
1228    pub(crate) promoted: Vec<Arc<ValidPoolTransaction<T>>>,
1229    /// transaction that were discarded during the update
1230    pub(crate) discarded: Vec<Arc<ValidPoolTransaction<T>>>,
1231}
1232
1233impl<T: PoolTransaction> OnNewCanonicalStateOutcome<T> {
1234    /// Returns all transactions that were promoted to the pending pool and adhere to the given
1235    /// [`TransactionListenerKind`].
1236    ///
1237    /// If the kind is [`TransactionListenerKind::PropagateOnly`], then only transactions that
1238    /// are allowed to be propagated are returned.
1239    pub(crate) fn pending_transactions(
1240        &self,
1241        kind: TransactionListenerKind,
1242    ) -> impl Iterator<Item = B256> + '_ {
1243        let iter = self.promoted.iter();
1244        PendingTransactionIter { kind, iter }
1245    }
1246
1247    /// Returns all FULL transactions that were promoted to the pending pool and adhere to the given
1248    /// [`TransactionListenerKind`].
1249    ///
1250    /// If the kind is [`TransactionListenerKind::PropagateOnly`], then only transactions that
1251    /// are allowed to be propagated are returned.
1252    pub(crate) fn full_pending_transactions(
1253        &self,
1254        kind: TransactionListenerKind,
1255    ) -> impl Iterator<Item = NewTransactionEvent<T>> + '_ {
1256        let iter = self.promoted.iter();
1257        FullPendingTransactionIter { kind, iter }
1258    }
1259}
1260
1261#[cfg(test)]
1262mod tests {
1263    use crate::{
1264        blobstore::{BlobStore, InMemoryBlobStore},
1265        test_utils::{MockTransaction, TestPoolBuilder},
1266        validate::ValidTransaction,
1267        BlockInfo, PoolConfig, SubPoolLimit, TransactionOrigin, TransactionValidationOutcome, U256,
1268    };
1269    use alloy_eips::eip4844::BlobTransactionSidecar;
1270    use std::{fs, path::PathBuf};
1271
1272    #[test]
1273    fn test_discard_blobs_on_blob_tx_eviction() {
1274        let blobs = {
1275            // Read the contents of the JSON file into a string.
1276            let json_content = fs::read_to_string(
1277                PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test_data/blob1.json"),
1278            )
1279            .expect("Failed to read the blob data file");
1280
1281            // Parse the JSON contents into a serde_json::Value.
1282            let json_value: serde_json::Value =
1283                serde_json::from_str(&json_content).expect("Failed to deserialize JSON");
1284
1285            // Extract blob data from JSON and convert it to Blob.
1286            vec![
1287                // Extract the "data" field from the JSON and parse it as a string.
1288                json_value
1289                    .get("data")
1290                    .unwrap()
1291                    .as_str()
1292                    .expect("Data is not a valid string")
1293                    .to_string(),
1294            ]
1295        };
1296
1297        // Generate a BlobTransactionSidecar from the blobs.
1298        let sidecar = BlobTransactionSidecar::try_from_blobs_hex(blobs).unwrap();
1299
1300        // Define the maximum limit for blobs in the sub-pool.
1301        let blob_limit = SubPoolLimit::new(1000, usize::MAX);
1302
1303        // Create a test pool with default configuration and the specified blob limit.
1304        let test_pool = &TestPoolBuilder::default()
1305            .with_config(PoolConfig { blob_limit, ..Default::default() })
1306            .pool;
1307
1308        // Set the block info for the pool, including a pending blob fee.
1309        test_pool
1310            .set_block_info(BlockInfo { pending_blob_fee: Some(10_000_000), ..Default::default() });
1311
1312        // Create an in-memory blob store.
1313        let blob_store = InMemoryBlobStore::default();
1314
1315        // Loop to add transactions to the pool and test blob eviction.
1316        for n in 0..blob_limit.max_txs + 10 {
1317            // Create a mock transaction with the generated blob sidecar.
1318            let mut tx = MockTransaction::eip4844_with_sidecar(sidecar.clone());
1319
1320            // Set non zero size
1321            tx.set_size(1844674407370951);
1322
1323            // Insert the sidecar into the blob store if the current index is within the blob limit.
1324            if n < blob_limit.max_txs {
1325                blob_store.insert(*tx.get_hash(), sidecar.clone()).unwrap();
1326            }
1327
1328            // Add the transaction to the pool with external origin and valid outcome.
1329            test_pool.add_transactions(
1330                TransactionOrigin::External,
1331                [TransactionValidationOutcome::Valid {
1332                    balance: U256::from(1_000),
1333                    state_nonce: 0,
1334                    transaction: ValidTransaction::ValidWithSidecar {
1335                        transaction: tx,
1336                        sidecar: sidecar.clone(),
1337                    },
1338                    propagate: true,
1339                }],
1340            );
1341        }
1342
1343        // Assert that the size of the pool's blob component is equal to the maximum blob limit.
1344        assert_eq!(test_pool.size().blob, blob_limit.max_txs);
1345
1346        // Assert that the size of the pool's blob_size component matches the expected value.
1347        assert_eq!(test_pool.size().blob_size, 1844674407370951000);
1348
1349        // Assert that the pool's blob store matches the expected blob store.
1350        assert_eq!(*test_pool.blob_store(), blob_store);
1351    }
1352}