1use crate::{
69 blobstore::BlobStore,
70 error::{PoolError, PoolErrorKind, PoolResult},
71 identifier::{SenderId, SenderIdentifiers, TransactionId},
72 metrics::BlobStoreMetrics,
73 pool::{
74 listener::{
75 BlobTransactionSidecarListener, PendingTransactionHashListener, PoolEventBroadcast,
76 TransactionListener,
77 },
78 state::SubPool,
79 txpool::{SenderInfo, TxPool},
80 update::UpdateOutcome,
81 },
82 traits::{
83 AllPoolTransactions, BestTransactionsAttributes, BlockInfo, GetPooledTransactionLimit,
84 NewBlobSidecar, PoolSize, PoolTransaction, PropagatedTransactions, TransactionOrigin,
85 },
86 validate::{TransactionValidationOutcome, ValidPoolTransaction, ValidTransaction},
87 CanonicalStateUpdate, EthPoolTransaction, PoolConfig, TransactionOrdering,
88 TransactionValidator,
89};
90
91use alloy_primitives::{Address, TxHash, B256};
92use best::BestTransactions;
93use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
94use reth_eth_wire_types::HandleMempoolData;
95use reth_execution_types::ChangedAccount;
96
97use alloy_eips::{eip4844::BlobTransactionSidecar, Typed2718};
98use reth_primitives_traits::Recovered;
99use rustc_hash::FxHashMap;
100use std::{collections::HashSet, fmt, sync::Arc, time::Instant};
101use tokio::sync::mpsc;
102use tracing::{debug, trace, warn};
103mod events;
104pub use best::{BestTransactionFilter, BestTransactionsWithPrioritizedSenders};
105pub use blob::{blob_tx_priority, fee_delta, BlobOrd, BlobTransactions};
106pub use events::{FullTransactionEvent, NewTransactionEvent, TransactionEvent};
107pub use listener::{AllTransactionsEvents, TransactionEvents, TransactionListenerKind};
108pub use parked::{BasefeeOrd, ParkedOrd, ParkedPool, QueuedOrd};
109pub use pending::PendingPool;
110use reth_primitives_traits::Block;
111
112mod best;
113mod blob;
114mod listener;
115mod parked;
116pub(crate) mod pending;
117pub(crate) mod size;
118pub(crate) mod state;
119pub mod txpool;
120mod update;
121
122pub const PENDING_TX_LISTENER_BUFFER_SIZE: usize = 2048;
124pub const NEW_TX_LISTENER_BUFFER_SIZE: usize = 1024;
126
127const BLOB_SIDECAR_LISTENER_BUFFER_SIZE: usize = 512;
128
129pub struct PoolInner<V, T, S>
131where
132 T: TransactionOrdering,
133{
134 identifiers: RwLock<SenderIdentifiers>,
136 validator: V,
138 blob_store: S,
140 pool: RwLock<TxPool<T>>,
142 config: PoolConfig,
144 event_listener: RwLock<PoolEventBroadcast<T::Transaction>>,
146 pending_transaction_listener: Mutex<Vec<PendingTransactionHashListener>>,
148 transaction_listener: Mutex<Vec<TransactionListener<T::Transaction>>>,
150 blob_transaction_sidecar_listener: Mutex<Vec<BlobTransactionSidecarListener>>,
152 blob_store_metrics: BlobStoreMetrics,
154}
155
156impl<V, T, S> PoolInner<V, T, S>
159where
160 V: TransactionValidator,
161 T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
162 S: BlobStore,
163{
164 pub fn new(validator: V, ordering: T, blob_store: S, config: PoolConfig) -> Self {
166 Self {
167 identifiers: Default::default(),
168 validator,
169 event_listener: Default::default(),
170 pool: RwLock::new(TxPool::new(ordering, config.clone())),
171 pending_transaction_listener: Default::default(),
172 transaction_listener: Default::default(),
173 blob_transaction_sidecar_listener: Default::default(),
174 config,
175 blob_store,
176 blob_store_metrics: Default::default(),
177 }
178 }
179
180 pub const fn blob_store(&self) -> &S {
182 &self.blob_store
183 }
184
185 pub fn size(&self) -> PoolSize {
187 self.get_pool_data().size()
188 }
189
190 pub fn block_info(&self) -> BlockInfo {
192 self.get_pool_data().block_info()
193 }
194 pub fn set_block_info(&self, info: BlockInfo) {
196 self.pool.write().set_block_info(info)
197 }
198
199 pub fn get_sender_id(&self, addr: Address) -> SenderId {
201 self.identifiers.write().sender_id_or_create(addr)
202 }
203
204 pub fn unique_senders(&self) -> HashSet<Address> {
206 self.get_pool_data().unique_senders()
207 }
208
209 fn changed_senders(
212 &self,
213 accs: impl Iterator<Item = ChangedAccount>,
214 ) -> FxHashMap<SenderId, SenderInfo> {
215 let mut identifiers = self.identifiers.write();
216 accs.into_iter()
217 .map(|acc| {
218 let ChangedAccount { address, nonce, balance } = acc;
219 let sender_id = identifiers.sender_id_or_create(address);
220 (sender_id, SenderInfo { state_nonce: nonce, balance })
221 })
222 .collect()
223 }
224
225 pub const fn config(&self) -> &PoolConfig {
227 &self.config
228 }
229
230 pub const fn validator(&self) -> &V {
232 &self.validator
233 }
234
235 pub fn add_pending_listener(&self, kind: TransactionListenerKind) -> mpsc::Receiver<TxHash> {
238 let (sender, rx) = mpsc::channel(self.config.pending_tx_listener_buffer_size);
239 let listener = PendingTransactionHashListener { sender, kind };
240 self.pending_transaction_listener.lock().push(listener);
241 rx
242 }
243
244 pub fn add_new_transaction_listener(
246 &self,
247 kind: TransactionListenerKind,
248 ) -> mpsc::Receiver<NewTransactionEvent<T::Transaction>> {
249 let (sender, rx) = mpsc::channel(self.config.new_tx_listener_buffer_size);
250 let listener = TransactionListener { sender, kind };
251 self.transaction_listener.lock().push(listener);
252 rx
253 }
254 pub fn add_blob_sidecar_listener(&self) -> mpsc::Receiver<NewBlobSidecar> {
257 let (sender, rx) = mpsc::channel(BLOB_SIDECAR_LISTENER_BUFFER_SIZE);
258 let listener = BlobTransactionSidecarListener { sender };
259 self.blob_transaction_sidecar_listener.lock().push(listener);
260 rx
261 }
262
263 pub fn add_transaction_event_listener(&self, tx_hash: TxHash) -> Option<TransactionEvents> {
266 self.get_pool_data()
267 .contains(&tx_hash)
268 .then(|| self.event_listener.write().subscribe(tx_hash))
269 }
270
271 pub fn add_all_transactions_event_listener(&self) -> AllTransactionsEvents<T::Transaction> {
273 self.event_listener.write().subscribe_all()
274 }
275
276 pub fn get_pool_data(&self) -> RwLockReadGuard<'_, TxPool<T>> {
278 self.pool.read()
279 }
280
281 pub fn pooled_transactions_hashes(&self) -> Vec<TxHash> {
283 self.get_pool_data()
284 .all()
285 .transactions_iter()
286 .filter(|tx| tx.propagate)
287 .map(|tx| *tx.hash())
288 .collect()
289 }
290
291 pub fn pooled_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
293 self.get_pool_data().all().transactions_iter().filter(|tx| tx.propagate).cloned().collect()
294 }
295
296 pub fn pooled_transactions_max(
298 &self,
299 max: usize,
300 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
301 self.get_pool_data()
302 .all()
303 .transactions_iter()
304 .filter(|tx| tx.propagate)
305 .take(max)
306 .cloned()
307 .collect()
308 }
309
310 fn to_pooled_transaction(
315 &self,
316 transaction: Arc<ValidPoolTransaction<T::Transaction>>,
317 ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
318 where
319 <V as TransactionValidator>::Transaction: EthPoolTransaction,
320 {
321 if transaction.is_eip4844() {
322 let sidecar = self.blob_store.get(*transaction.hash()).ok()??;
323 transaction.transaction.clone().try_into_pooled_eip4844(sidecar)
324 } else {
325 transaction
326 .transaction
327 .clone()
328 .try_into_pooled()
329 .inspect_err(|err| {
330 debug!(
331 target: "txpool", %err,
332 "failed to convert transaction to pooled element; skipping",
333 );
334 })
335 .ok()
336 }
337 }
338
339 pub fn get_pooled_transaction_elements(
341 &self,
342 tx_hashes: Vec<TxHash>,
343 limit: GetPooledTransactionLimit,
344 ) -> Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>
345 where
346 <V as TransactionValidator>::Transaction: EthPoolTransaction,
347 {
348 let transactions = self.get_all(tx_hashes);
349 let mut elements = Vec::with_capacity(transactions.len());
350 let mut size = 0;
351 for transaction in transactions {
352 let encoded_len = transaction.encoded_length();
353 let Some(pooled) = self.to_pooled_transaction(transaction) else {
354 continue;
355 };
356
357 size += encoded_len;
358 elements.push(pooled.into_inner());
359
360 if limit.exceeds(size) {
361 break
362 }
363 }
364
365 elements
366 }
367
368 pub fn get_pooled_transaction_element(
370 &self,
371 tx_hash: TxHash,
372 ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
373 where
374 <V as TransactionValidator>::Transaction: EthPoolTransaction,
375 {
376 self.get(&tx_hash).and_then(|tx| self.to_pooled_transaction(tx))
377 }
378
379 pub fn on_canonical_state_change<B>(&self, update: CanonicalStateUpdate<'_, B>)
381 where
382 B: Block,
383 {
384 trace!(target: "txpool", ?update, "updating pool on canonical state change");
385
386 let block_info = update.block_info();
387 let CanonicalStateUpdate {
388 new_tip, changed_accounts, mined_transactions, update_kind, ..
389 } = update;
390 self.validator.on_new_head_block(new_tip);
391
392 let changed_senders = self.changed_senders(changed_accounts.into_iter());
393
394 let outcome = self.pool.write().on_canonical_state_change(
396 block_info,
397 mined_transactions,
398 changed_senders,
399 update_kind,
400 );
401
402 self.delete_discarded_blobs(outcome.discarded.iter());
404
405 self.notify_on_new_state(outcome);
407 }
408
409 pub fn update_accounts(&self, accounts: Vec<ChangedAccount>) {
413 let changed_senders = self.changed_senders(accounts.into_iter());
414 let UpdateOutcome { promoted, discarded } =
415 self.pool.write().update_accounts(changed_senders);
416 let mut listener = self.event_listener.write();
417
418 promoted.iter().for_each(|tx| listener.pending(tx.hash(), None));
419 discarded.iter().for_each(|tx| listener.discarded(tx.hash()));
420
421 self.delete_discarded_blobs(discarded.iter());
424 }
425
426 fn add_transaction(
431 &self,
432 pool: &mut RwLockWriteGuard<'_, TxPool<T>>,
433 origin: TransactionOrigin,
434 tx: TransactionValidationOutcome<T::Transaction>,
435 ) -> PoolResult<TxHash> {
436 match tx {
437 TransactionValidationOutcome::Valid {
438 balance,
439 state_nonce,
440 transaction,
441 propagate,
442 } => {
443 let sender_id = self.get_sender_id(transaction.sender());
444 let transaction_id = TransactionId::new(sender_id, transaction.nonce());
445
446 let (transaction, maybe_sidecar) = match transaction {
448 ValidTransaction::Valid(tx) => (tx, None),
449 ValidTransaction::ValidWithSidecar { transaction, sidecar } => {
450 debug_assert!(
451 transaction.is_eip4844(),
452 "validator returned sidecar for non EIP-4844 transaction"
453 );
454 (transaction, Some(sidecar))
455 }
456 };
457
458 let tx = ValidPoolTransaction {
459 transaction,
460 transaction_id,
461 propagate,
462 timestamp: Instant::now(),
463 origin,
464 };
465
466 let added = pool.add_transaction(tx, balance, state_nonce)?;
467 let hash = *added.hash();
468
469 if let Some(sidecar) = maybe_sidecar {
471 self.on_new_blob_sidecar(&hash, &sidecar);
473 self.insert_blob(hash, sidecar);
475 }
476
477 if let Some(replaced) = added.replaced_blob_transaction() {
478 debug!(target: "txpool", "[{:?}] delete replaced blob sidecar", replaced);
479 self.delete_blob(replaced);
481 }
482
483 if let Some(pending) = added.as_pending() {
485 self.on_new_pending_transaction(pending);
486 }
487
488 self.notify_event_listeners(&added);
490
491 if let Some(discarded) = added.discarded_transactions() {
492 self.delete_discarded_blobs(discarded.iter());
493 }
494
495 self.on_new_transaction(added.into_new_transaction_event());
497
498 Ok(hash)
499 }
500 TransactionValidationOutcome::Invalid(tx, err) => {
501 let mut listener = self.event_listener.write();
502 listener.invalid(tx.hash());
503 Err(PoolError::new(*tx.hash(), err))
504 }
505 TransactionValidationOutcome::Error(tx_hash, err) => {
506 let mut listener = self.event_listener.write();
507 listener.discarded(&tx_hash);
508 Err(PoolError::other(tx_hash, err))
509 }
510 }
511 }
512
513 pub fn add_transaction_and_subscribe(
515 &self,
516 origin: TransactionOrigin,
517 tx: TransactionValidationOutcome<T::Transaction>,
518 ) -> PoolResult<TransactionEvents> {
519 let listener = {
520 let mut listener = self.event_listener.write();
521 listener.subscribe(tx.tx_hash())
522 };
523 let mut results = self.add_transactions(origin, std::iter::once(tx));
524 results.pop().expect("result length is the same as the input")?;
525 Ok(listener)
526 }
527
528 pub fn add_transactions(
534 &self,
535 origin: TransactionOrigin,
536 transactions: impl IntoIterator<Item = TransactionValidationOutcome<T::Transaction>>,
537 ) -> Vec<PoolResult<TxHash>> {
538 let (mut added, discarded) = {
540 let mut pool = self.pool.write();
541 let added = transactions
542 .into_iter()
543 .map(|tx| self.add_transaction(&mut pool, origin, tx))
544 .collect::<Vec<_>>();
545
546 let discarded = if added.iter().any(Result::is_ok) {
548 pool.discard_worst()
549 } else {
550 Default::default()
551 };
552
553 (added, discarded)
554 };
555
556 if !discarded.is_empty() {
557 self.delete_discarded_blobs(discarded.iter());
559
560 let discarded_hashes =
561 discarded.into_iter().map(|tx| *tx.hash()).collect::<HashSet<_>>();
562
563 {
564 let mut listener = self.event_listener.write();
565 discarded_hashes.iter().for_each(|hash| listener.discarded(hash));
566 }
567
568 for res in &mut added {
571 if let Ok(hash) = res {
572 if discarded_hashes.contains(hash) {
573 *res = Err(PoolError::new(*hash, PoolErrorKind::DiscardedOnInsert))
574 }
575 }
576 }
577 }
578
579 added
580 }
581
582 fn on_new_pending_transaction(&self, pending: &AddedPendingTransaction<T::Transaction>) {
584 let propagate_allowed = pending.is_propagate_allowed();
585
586 let mut transaction_listeners = self.pending_transaction_listener.lock();
587 transaction_listeners.retain_mut(|listener| {
588 if listener.kind.is_propagate_only() && !propagate_allowed {
589 return !listener.sender.is_closed()
592 }
593
594 listener.send_all(pending.pending_transactions(listener.kind))
596 });
597 }
598
599 fn on_new_transaction(&self, event: NewTransactionEvent<T::Transaction>) {
601 let mut transaction_listeners = self.transaction_listener.lock();
602 transaction_listeners.retain_mut(|listener| {
603 if listener.kind.is_propagate_only() && !event.transaction.propagate {
604 return !listener.sender.is_closed()
607 }
608
609 listener.send(event.clone())
610 });
611 }
612
613 fn on_new_blob_sidecar(&self, tx_hash: &TxHash, sidecar: &BlobTransactionSidecar) {
615 let mut sidecar_listeners = self.blob_transaction_sidecar_listener.lock();
616 if sidecar_listeners.is_empty() {
617 return
618 }
619 let sidecar = Arc::new(sidecar.clone());
620 sidecar_listeners.retain_mut(|listener| {
621 let new_blob_event = NewBlobSidecar { tx_hash: *tx_hash, sidecar: sidecar.clone() };
622 match listener.sender.try_send(new_blob_event) {
623 Ok(()) => true,
624 Err(err) => {
625 if matches!(err, mpsc::error::TrySendError::Full(_)) {
626 debug!(
627 target: "txpool",
628 "[{:?}] failed to send blob sidecar; channel full",
629 sidecar,
630 );
631 true
632 } else {
633 false
634 }
635 }
636 }
637 })
638 }
639
640 fn notify_on_new_state(&self, outcome: OnNewCanonicalStateOutcome<T::Transaction>) {
642 trace!(target: "txpool", promoted=outcome.promoted.len(), discarded= outcome.discarded.len() ,"notifying listeners on state change");
643
644 self.pending_transaction_listener
647 .lock()
648 .retain_mut(|listener| listener.send_all(outcome.pending_transactions(listener.kind)));
649
650 self.transaction_listener.lock().retain_mut(|listener| {
652 listener.send_all(outcome.full_pending_transactions(listener.kind))
653 });
654
655 let OnNewCanonicalStateOutcome { mined, promoted, discarded, block_hash } = outcome;
656
657 let mut listener = self.event_listener.write();
659
660 mined.iter().for_each(|tx| listener.mined(tx, block_hash));
661 promoted.iter().for_each(|tx| listener.pending(tx.hash(), None));
662 discarded.iter().for_each(|tx| listener.discarded(tx.hash()));
663 }
664
665 fn notify_event_listeners(&self, tx: &AddedTransaction<T::Transaction>) {
667 let mut listener = self.event_listener.write();
668
669 match tx {
670 AddedTransaction::Pending(tx) => {
671 let AddedPendingTransaction { transaction, promoted, discarded, replaced } = tx;
672
673 listener.pending(transaction.hash(), replaced.clone());
674 promoted.iter().for_each(|tx| listener.pending(tx.hash(), None));
675 discarded.iter().for_each(|tx| listener.discarded(tx.hash()));
676 }
677 AddedTransaction::Parked { transaction, replaced, .. } => {
678 listener.queued(transaction.hash());
679 if let Some(replaced) = replaced {
680 listener.replaced(replaced.clone(), *transaction.hash());
681 }
682 }
683 }
684 }
685
686 pub fn best_transactions(&self) -> BestTransactions<T> {
688 self.get_pool_data().best_transactions()
689 }
690
691 pub fn best_transactions_with_attributes(
694 &self,
695 best_transactions_attributes: BestTransactionsAttributes,
696 ) -> Box<dyn crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T::Transaction>>>>
697 {
698 self.get_pool_data().best_transactions_with_attributes(best_transactions_attributes)
699 }
700
701 pub fn pending_transactions_max(
703 &self,
704 max: usize,
705 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
706 self.get_pool_data().pending_transactions_iter().take(max).collect()
707 }
708
709 pub fn pending_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
711 self.get_pool_data().pending_transactions()
712 }
713
714 pub fn queued_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
716 self.get_pool_data().queued_transactions()
717 }
718
719 pub fn all_transactions(&self) -> AllPoolTransactions<T::Transaction> {
721 let pool = self.get_pool_data();
722 AllPoolTransactions {
723 pending: pool.pending_transactions(),
724 queued: pool.queued_transactions(),
725 }
726 }
727
728 pub fn remove_transactions(
733 &self,
734 hashes: Vec<TxHash>,
735 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
736 if hashes.is_empty() {
737 return Vec::new()
738 }
739 let removed = self.pool.write().remove_transactions(hashes);
740
741 let mut listener = self.event_listener.write();
742
743 removed.iter().for_each(|tx| listener.discarded(tx.hash()));
744
745 removed
746 }
747
748 pub fn remove_transactions_and_descendants(
751 &self,
752 hashes: Vec<TxHash>,
753 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
754 if hashes.is_empty() {
755 return Vec::new()
756 }
757 let removed = self.pool.write().remove_transactions_and_descendants(hashes);
758
759 let mut listener = self.event_listener.write();
760
761 removed.iter().for_each(|tx| listener.discarded(tx.hash()));
762
763 removed
764 }
765
766 pub fn remove_transactions_by_sender(
768 &self,
769 sender: Address,
770 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
771 let sender_id = self.get_sender_id(sender);
772 let removed = self.pool.write().remove_transactions_by_sender(sender_id);
773
774 let mut listener = self.event_listener.write();
775
776 removed.iter().for_each(|tx| listener.discarded(tx.hash()));
777
778 removed
779 }
780
781 pub fn retain_unknown<A>(&self, announcement: &mut A)
783 where
784 A: HandleMempoolData,
785 {
786 if announcement.is_empty() {
787 return
788 }
789 let pool = self.get_pool_data();
790 announcement.retain_by_hash(|tx| !pool.contains(tx))
791 }
792
793 pub fn get(&self, tx_hash: &TxHash) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
795 self.get_pool_data().get(tx_hash)
796 }
797
798 pub fn get_transactions_by_sender(
800 &self,
801 sender: Address,
802 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
803 let sender_id = self.get_sender_id(sender);
804 self.get_pool_data().get_transactions_by_sender(sender_id)
805 }
806
807 pub fn get_queued_transactions_by_sender(
809 &self,
810 sender: Address,
811 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
812 let sender_id = self.get_sender_id(sender);
813 self.get_pool_data().queued_txs_by_sender(sender_id)
814 }
815
816 pub fn pending_transactions_with_predicate(
818 &self,
819 predicate: impl FnMut(&ValidPoolTransaction<T::Transaction>) -> bool,
820 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
821 self.get_pool_data().pending_transactions_with_predicate(predicate)
822 }
823
824 pub fn get_pending_transactions_by_sender(
826 &self,
827 sender: Address,
828 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
829 let sender_id = self.get_sender_id(sender);
830 self.get_pool_data().pending_txs_by_sender(sender_id)
831 }
832
833 pub fn get_highest_transaction_by_sender(
835 &self,
836 sender: Address,
837 ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
838 let sender_id = self.get_sender_id(sender);
839 self.get_pool_data().get_highest_transaction_by_sender(sender_id)
840 }
841
842 pub fn get_highest_consecutive_transaction_by_sender(
844 &self,
845 sender: Address,
846 on_chain_nonce: u64,
847 ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
848 let sender_id = self.get_sender_id(sender);
849 self.get_pool_data().get_highest_consecutive_transaction_by_sender(
850 sender_id.into_transaction_id(on_chain_nonce),
851 )
852 }
853
854 pub fn get_transaction_by_transaction_id(
856 &self,
857 transaction_id: &TransactionId,
858 ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
859 self.get_pool_data().all().get(transaction_id).map(|tx| tx.transaction.clone())
860 }
861
862 pub fn get_transactions_by_origin(
864 &self,
865 origin: TransactionOrigin,
866 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
867 self.get_pool_data()
868 .all()
869 .transactions_iter()
870 .filter(|tx| tx.origin == origin)
871 .cloned()
872 .collect()
873 }
874
875 pub fn get_pending_transactions_by_origin(
877 &self,
878 origin: TransactionOrigin,
879 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
880 self.get_pool_data().pending_transactions_iter().filter(|tx| tx.origin == origin).collect()
881 }
882
883 pub fn get_all(&self, txs: Vec<TxHash>) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
887 if txs.is_empty() {
888 return Vec::new()
889 }
890 self.get_pool_data().get_all(txs).collect()
891 }
892
893 pub fn on_propagated(&self, txs: PropagatedTransactions) {
895 if txs.0.is_empty() {
896 return
897 }
898 let mut listener = self.event_listener.write();
899
900 txs.0.into_iter().for_each(|(hash, peers)| listener.propagated(&hash, peers))
901 }
902
903 pub fn len(&self) -> usize {
905 self.get_pool_data().len()
906 }
907
908 pub fn is_empty(&self) -> bool {
910 self.get_pool_data().is_empty()
911 }
912
913 pub fn is_exceeded(&self) -> bool {
915 self.pool.read().is_exceeded()
916 }
917
918 fn insert_blob(&self, hash: TxHash, blob: BlobTransactionSidecar) {
920 debug!(target: "txpool", "[{:?}] storing blob sidecar", hash);
921 if let Err(err) = self.blob_store.insert(hash, blob) {
922 warn!(target: "txpool", %err, "[{:?}] failed to insert blob", hash);
923 self.blob_store_metrics.blobstore_failed_inserts.increment(1);
924 }
925 self.update_blob_store_metrics();
926 }
927
928 pub fn delete_blob(&self, blob: TxHash) {
930 let _ = self.blob_store.delete(blob);
931 }
932
933 pub fn delete_blobs(&self, txs: Vec<TxHash>) {
935 let _ = self.blob_store.delete_all(txs);
936 }
937
938 pub fn cleanup_blobs(&self) {
940 let stat = self.blob_store.cleanup();
941 self.blob_store_metrics.blobstore_failed_deletes.increment(stat.delete_failed as u64);
942 self.update_blob_store_metrics();
943 }
944
945 fn update_blob_store_metrics(&self) {
946 if let Some(data_size) = self.blob_store.data_size_hint() {
947 self.blob_store_metrics.blobstore_byte_size.set(data_size as f64);
948 }
949 self.blob_store_metrics.blobstore_entries.set(self.blob_store.blobs_len() as f64);
950 }
951
952 fn delete_discarded_blobs<'a>(
954 &'a self,
955 transactions: impl IntoIterator<Item = &'a Arc<ValidPoolTransaction<T::Transaction>>>,
956 ) {
957 let blob_txs = transactions
958 .into_iter()
959 .filter(|tx| tx.transaction.is_eip4844())
960 .map(|tx| *tx.hash())
961 .collect();
962 self.delete_blobs(blob_txs);
963 }
964}
965
966impl<V, T: TransactionOrdering, S> fmt::Debug for PoolInner<V, T, S> {
967 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
968 f.debug_struct("PoolInner").field("config", &self.config).finish_non_exhaustive()
969 }
970}
971
972#[derive(Debug, Clone)]
974pub struct AddedPendingTransaction<T: PoolTransaction> {
975 transaction: Arc<ValidPoolTransaction<T>>,
977 replaced: Option<Arc<ValidPoolTransaction<T>>>,
979 promoted: Vec<Arc<ValidPoolTransaction<T>>>,
981 discarded: Vec<Arc<ValidPoolTransaction<T>>>,
983}
984
985impl<T: PoolTransaction> AddedPendingTransaction<T> {
986 pub(crate) fn pending_transactions(
992 &self,
993 kind: TransactionListenerKind,
994 ) -> impl Iterator<Item = B256> + '_ {
995 let iter = std::iter::once(&self.transaction).chain(self.promoted.iter());
996 PendingTransactionIter { kind, iter }
997 }
998
999 pub(crate) fn is_propagate_allowed(&self) -> bool {
1001 self.transaction.propagate
1002 }
1003}
1004
1005pub(crate) struct PendingTransactionIter<Iter> {
1006 kind: TransactionListenerKind,
1007 iter: Iter,
1008}
1009
1010impl<'a, Iter, T> Iterator for PendingTransactionIter<Iter>
1011where
1012 Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1013 T: PoolTransaction + 'a,
1014{
1015 type Item = B256;
1016
1017 fn next(&mut self) -> Option<Self::Item> {
1018 loop {
1019 let next = self.iter.next()?;
1020 if self.kind.is_propagate_only() && !next.propagate {
1021 continue
1022 }
1023 return Some(*next.hash())
1024 }
1025 }
1026}
1027
1028pub(crate) struct FullPendingTransactionIter<Iter> {
1030 kind: TransactionListenerKind,
1031 iter: Iter,
1032}
1033
1034impl<'a, Iter, T> Iterator for FullPendingTransactionIter<Iter>
1035where
1036 Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1037 T: PoolTransaction + 'a,
1038{
1039 type Item = NewTransactionEvent<T>;
1040
1041 fn next(&mut self) -> Option<Self::Item> {
1042 loop {
1043 let next = self.iter.next()?;
1044 if self.kind.is_propagate_only() && !next.propagate {
1045 continue
1046 }
1047 return Some(NewTransactionEvent {
1048 subpool: SubPool::Pending,
1049 transaction: next.clone(),
1050 })
1051 }
1052 }
1053}
1054
1055#[derive(Debug, Clone)]
1057pub enum AddedTransaction<T: PoolTransaction> {
1058 Pending(AddedPendingTransaction<T>),
1060 Parked {
1063 transaction: Arc<ValidPoolTransaction<T>>,
1065 replaced: Option<Arc<ValidPoolTransaction<T>>>,
1067 subpool: SubPool,
1069 },
1070}
1071
1072impl<T: PoolTransaction> AddedTransaction<T> {
1073 pub(crate) const fn as_pending(&self) -> Option<&AddedPendingTransaction<T>> {
1075 match self {
1076 Self::Pending(tx) => Some(tx),
1077 _ => None,
1078 }
1079 }
1080
1081 pub(crate) const fn replaced(&self) -> Option<&Arc<ValidPoolTransaction<T>>> {
1083 match self {
1084 Self::Pending(tx) => tx.replaced.as_ref(),
1085 Self::Parked { replaced, .. } => replaced.as_ref(),
1086 }
1087 }
1088
1089 pub(crate) fn discarded_transactions(&self) -> Option<&[Arc<ValidPoolTransaction<T>>]> {
1091 match self {
1092 Self::Pending(tx) => Some(&tx.discarded),
1093 Self::Parked { .. } => None,
1094 }
1095 }
1096
1097 pub(crate) fn replaced_blob_transaction(&self) -> Option<B256> {
1099 self.replaced().filter(|tx| tx.transaction.is_eip4844()).map(|tx| *tx.transaction.hash())
1100 }
1101
1102 pub(crate) fn hash(&self) -> &TxHash {
1104 match self {
1105 Self::Pending(tx) => tx.transaction.hash(),
1106 Self::Parked { transaction, .. } => transaction.hash(),
1107 }
1108 }
1109
1110 pub(crate) fn into_new_transaction_event(self) -> NewTransactionEvent<T> {
1112 match self {
1113 Self::Pending(tx) => {
1114 NewTransactionEvent { subpool: SubPool::Pending, transaction: tx.transaction }
1115 }
1116 Self::Parked { transaction, subpool, .. } => {
1117 NewTransactionEvent { transaction, subpool }
1118 }
1119 }
1120 }
1121
1122 #[cfg(test)]
1124 pub(crate) const fn subpool(&self) -> SubPool {
1125 match self {
1126 Self::Pending(_) => SubPool::Pending,
1127 Self::Parked { subpool, .. } => *subpool,
1128 }
1129 }
1130
1131 #[cfg(test)]
1133 pub(crate) fn id(&self) -> &TransactionId {
1134 match self {
1135 Self::Pending(added) => added.transaction.id(),
1136 Self::Parked { transaction, .. } => transaction.id(),
1137 }
1138 }
1139}
1140
1141#[derive(Debug)]
1143pub(crate) struct OnNewCanonicalStateOutcome<T: PoolTransaction> {
1144 pub(crate) block_hash: B256,
1146 pub(crate) mined: Vec<TxHash>,
1148 pub(crate) promoted: Vec<Arc<ValidPoolTransaction<T>>>,
1150 pub(crate) discarded: Vec<Arc<ValidPoolTransaction<T>>>,
1152}
1153
1154impl<T: PoolTransaction> OnNewCanonicalStateOutcome<T> {
1155 pub(crate) fn pending_transactions(
1161 &self,
1162 kind: TransactionListenerKind,
1163 ) -> impl Iterator<Item = B256> + '_ {
1164 let iter = self.promoted.iter();
1165 PendingTransactionIter { kind, iter }
1166 }
1167
1168 pub(crate) fn full_pending_transactions(
1174 &self,
1175 kind: TransactionListenerKind,
1176 ) -> impl Iterator<Item = NewTransactionEvent<T>> + '_ {
1177 let iter = self.promoted.iter();
1178 FullPendingTransactionIter { kind, iter }
1179 }
1180}
1181
1182#[cfg(test)]
1183mod tests {
1184 use crate::{
1185 blobstore::{BlobStore, InMemoryBlobStore},
1186 test_utils::{MockTransaction, TestPoolBuilder},
1187 validate::ValidTransaction,
1188 BlockInfo, PoolConfig, SubPoolLimit, TransactionOrigin, TransactionValidationOutcome, U256,
1189 };
1190 use alloy_eips::eip4844::BlobTransactionSidecar;
1191 use std::{fs, path::PathBuf};
1192
1193 #[test]
1194 fn test_discard_blobs_on_blob_tx_eviction() {
1195 let blobs = {
1196 let json_content = fs::read_to_string(
1198 PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test_data/blob1.json"),
1199 )
1200 .expect("Failed to read the blob data file");
1201
1202 let json_value: serde_json::Value =
1204 serde_json::from_str(&json_content).expect("Failed to deserialize JSON");
1205
1206 vec![
1208 json_value
1210 .get("data")
1211 .unwrap()
1212 .as_str()
1213 .expect("Data is not a valid string")
1214 .to_string(),
1215 ]
1216 };
1217
1218 let sidecar = BlobTransactionSidecar::try_from_blobs_hex(blobs).unwrap();
1220
1221 let blob_limit = SubPoolLimit::new(1000, usize::MAX);
1223
1224 let test_pool = &TestPoolBuilder::default()
1226 .with_config(PoolConfig { blob_limit, ..Default::default() })
1227 .pool;
1228
1229 test_pool
1231 .set_block_info(BlockInfo { pending_blob_fee: Some(10_000_000), ..Default::default() });
1232
1233 let blob_store = InMemoryBlobStore::default();
1235
1236 for n in 0..blob_limit.max_txs + 10 {
1238 let mut tx = MockTransaction::eip4844_with_sidecar(sidecar.clone());
1240
1241 tx.set_size(1844674407370951);
1243
1244 if n < blob_limit.max_txs {
1246 blob_store.insert(*tx.get_hash(), sidecar.clone()).unwrap();
1247 }
1248
1249 test_pool.add_transactions(
1251 TransactionOrigin::External,
1252 [TransactionValidationOutcome::Valid {
1253 balance: U256::from(1_000),
1254 state_nonce: 0,
1255 transaction: ValidTransaction::ValidWithSidecar {
1256 transaction: tx,
1257 sidecar: sidecar.clone(),
1258 },
1259 propagate: true,
1260 }],
1261 );
1262 }
1263
1264 assert_eq!(test_pool.size().blob, blob_limit.max_txs);
1266
1267 assert_eq!(test_pool.size().blob_size, 1844674407370951000);
1269
1270 assert_eq!(*test_pool.blob_store(), blob_store);
1272 }
1273}