1use crate::{
69 error::{PoolError, PoolErrorKind, PoolResult},
70 identifier::{SenderId, SenderIdentifiers, TransactionId},
71 pool::{
72 listener::PoolEventBroadcast,
73 state::SubPool,
74 txpool::{SenderInfo, TxPool},
75 },
76 traits::{
77 AllPoolTransactions, BestTransactionsAttributes, BlockInfo, NewTransactionEvent, PoolSize,
78 PoolTransaction, PropagatedTransactions, TransactionOrigin,
79 },
80 validate::{TransactionValidationOutcome, ValidPoolTransaction},
81 CanonicalStateUpdate, EthPoolTransaction, PoolConfig, TransactionOrdering,
82 TransactionValidator,
83};
84
85use alloy_primitives::{Address, TxHash, B256};
86use best::BestTransactions;
87use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
88use reth_eth_wire_types::HandleMempoolData;
89use reth_execution_types::ChangedAccount;
90
91use alloy_eips::{eip4844::BlobTransactionSidecar, Typed2718};
92use reth_primitives_traits::Recovered;
93use rustc_hash::FxHashMap;
94use std::{collections::HashSet, fmt, sync::Arc, time::Instant};
95use tokio::sync::mpsc;
96use tracing::{debug, trace, warn};
97mod events;
98use crate::{
99 blobstore::BlobStore,
100 metrics::BlobStoreMetrics,
101 pool::txpool::UpdateOutcome,
102 traits::{GetPooledTransactionLimit, NewBlobSidecar, TransactionListenerKind},
103 validate::ValidTransaction,
104};
105pub use best::{BestTransactionFilter, BestTransactionsWithPrioritizedSenders};
106pub use blob::{blob_tx_priority, fee_delta};
107pub use events::{FullTransactionEvent, TransactionEvent};
108pub use listener::{AllTransactionsEvents, TransactionEvents};
109pub use parked::{BasefeeOrd, ParkedOrd, ParkedPool, QueuedOrd};
110pub use pending::PendingPool;
111use reth_primitives_traits::Block;
112
113mod best;
114mod blob;
115mod listener;
116mod parked;
117pub(crate) mod pending;
118pub(crate) mod size;
119pub(crate) mod state;
120pub mod txpool;
121mod update;
122
123pub const PENDING_TX_LISTENER_BUFFER_SIZE: usize = 2048;
125pub const NEW_TX_LISTENER_BUFFER_SIZE: usize = 1024;
127
128const BLOB_SIDECAR_LISTENER_BUFFER_SIZE: usize = 512;
129
130pub struct PoolInner<V, T, S>
132where
133 T: TransactionOrdering,
134{
135 identifiers: RwLock<SenderIdentifiers>,
137 validator: V,
139 blob_store: S,
141 pool: RwLock<TxPool<T>>,
143 config: PoolConfig,
145 event_listener: RwLock<PoolEventBroadcast<T::Transaction>>,
147 pending_transaction_listener: Mutex<Vec<PendingTransactionHashListener>>,
149 transaction_listener: Mutex<Vec<TransactionListener<T::Transaction>>>,
151 blob_transaction_sidecar_listener: Mutex<Vec<BlobTransactionSidecarListener>>,
153 blob_store_metrics: BlobStoreMetrics,
155}
156
157impl<V, T, S> PoolInner<V, T, S>
160where
161 V: TransactionValidator,
162 T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
163 S: BlobStore,
164{
165 pub fn new(validator: V, ordering: T, blob_store: S, config: PoolConfig) -> Self {
167 Self {
168 identifiers: Default::default(),
169 validator,
170 event_listener: Default::default(),
171 pool: RwLock::new(TxPool::new(ordering, config.clone())),
172 pending_transaction_listener: Default::default(),
173 transaction_listener: Default::default(),
174 blob_transaction_sidecar_listener: Default::default(),
175 config,
176 blob_store,
177 blob_store_metrics: Default::default(),
178 }
179 }
180
181 pub const fn blob_store(&self) -> &S {
183 &self.blob_store
184 }
185
186 pub fn size(&self) -> PoolSize {
188 self.get_pool_data().size()
189 }
190
191 pub fn block_info(&self) -> BlockInfo {
193 self.get_pool_data().block_info()
194 }
195 pub fn set_block_info(&self, info: BlockInfo) {
197 self.pool.write().set_block_info(info)
198 }
199
200 pub fn get_sender_id(&self, addr: Address) -> SenderId {
202 self.identifiers.write().sender_id_or_create(addr)
203 }
204
205 pub fn unique_senders(&self) -> HashSet<Address> {
207 self.get_pool_data().unique_senders()
208 }
209
210 fn changed_senders(
213 &self,
214 accs: impl Iterator<Item = ChangedAccount>,
215 ) -> FxHashMap<SenderId, SenderInfo> {
216 let mut identifiers = self.identifiers.write();
217 accs.into_iter()
218 .map(|acc| {
219 let ChangedAccount { address, nonce, balance } = acc;
220 let sender_id = identifiers.sender_id_or_create(address);
221 (sender_id, SenderInfo { state_nonce: nonce, balance })
222 })
223 .collect()
224 }
225
226 pub const fn config(&self) -> &PoolConfig {
228 &self.config
229 }
230
231 pub const fn validator(&self) -> &V {
233 &self.validator
234 }
235
236 pub fn add_pending_listener(&self, kind: TransactionListenerKind) -> mpsc::Receiver<TxHash> {
239 let (sender, rx) = mpsc::channel(self.config.pending_tx_listener_buffer_size);
240 let listener = PendingTransactionHashListener { sender, kind };
241 self.pending_transaction_listener.lock().push(listener);
242 rx
243 }
244
245 pub fn add_new_transaction_listener(
247 &self,
248 kind: TransactionListenerKind,
249 ) -> mpsc::Receiver<NewTransactionEvent<T::Transaction>> {
250 let (sender, rx) = mpsc::channel(self.config.new_tx_listener_buffer_size);
251 let listener = TransactionListener { sender, kind };
252 self.transaction_listener.lock().push(listener);
253 rx
254 }
255 pub fn add_blob_sidecar_listener(&self) -> mpsc::Receiver<NewBlobSidecar> {
258 let (sender, rx) = mpsc::channel(BLOB_SIDECAR_LISTENER_BUFFER_SIZE);
259 let listener = BlobTransactionSidecarListener { sender };
260 self.blob_transaction_sidecar_listener.lock().push(listener);
261 rx
262 }
263
264 pub fn add_transaction_event_listener(&self, tx_hash: TxHash) -> Option<TransactionEvents> {
267 self.get_pool_data()
268 .contains(&tx_hash)
269 .then(|| self.event_listener.write().subscribe(tx_hash))
270 }
271
272 pub fn add_all_transactions_event_listener(&self) -> AllTransactionsEvents<T::Transaction> {
274 self.event_listener.write().subscribe_all()
275 }
276
277 pub fn get_pool_data(&self) -> RwLockReadGuard<'_, TxPool<T>> {
279 self.pool.read()
280 }
281
282 pub fn pooled_transactions_hashes(&self) -> Vec<TxHash> {
284 self.get_pool_data()
285 .all()
286 .transactions_iter()
287 .filter(|tx| tx.propagate)
288 .map(|tx| *tx.hash())
289 .collect()
290 }
291
292 pub fn pooled_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
294 self.get_pool_data().all().transactions_iter().filter(|tx| tx.propagate).cloned().collect()
295 }
296
297 pub fn pooled_transactions_max(
299 &self,
300 max: usize,
301 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
302 self.get_pool_data()
303 .all()
304 .transactions_iter()
305 .filter(|tx| tx.propagate)
306 .take(max)
307 .cloned()
308 .collect()
309 }
310
311 fn to_pooled_transaction(
316 &self,
317 transaction: Arc<ValidPoolTransaction<T::Transaction>>,
318 ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
319 where
320 <V as TransactionValidator>::Transaction: EthPoolTransaction,
321 {
322 if transaction.is_eip4844() {
323 let sidecar = self.blob_store.get(*transaction.hash()).ok()??;
324 transaction.transaction.clone().try_into_pooled_eip4844(sidecar)
325 } else {
326 transaction
327 .transaction
328 .clone()
329 .try_into_pooled()
330 .inspect_err(|err| {
331 debug!(
332 target: "txpool", %err,
333 "failed to convert transaction to pooled element; skipping",
334 );
335 })
336 .ok()
337 }
338 }
339
340 pub fn get_pooled_transaction_elements(
342 &self,
343 tx_hashes: Vec<TxHash>,
344 limit: GetPooledTransactionLimit,
345 ) -> Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>
346 where
347 <V as TransactionValidator>::Transaction: EthPoolTransaction,
348 {
349 let transactions = self.get_all(tx_hashes);
350 let mut elements = Vec::with_capacity(transactions.len());
351 let mut size = 0;
352 for transaction in transactions {
353 let encoded_len = transaction.encoded_length();
354 let Some(pooled) = self.to_pooled_transaction(transaction) else {
355 continue;
356 };
357
358 size += encoded_len;
359 elements.push(pooled.into_inner());
360
361 if limit.exceeds(size) {
362 break
363 }
364 }
365
366 elements
367 }
368
369 pub fn get_pooled_transaction_element(
371 &self,
372 tx_hash: TxHash,
373 ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
374 where
375 <V as TransactionValidator>::Transaction: EthPoolTransaction,
376 {
377 self.get(&tx_hash).and_then(|tx| self.to_pooled_transaction(tx))
378 }
379
380 pub fn on_canonical_state_change<B>(&self, update: CanonicalStateUpdate<'_, B>)
382 where
383 B: Block,
384 {
385 trace!(target: "txpool", ?update, "updating pool on canonical state change");
386
387 let block_info = update.block_info();
388 let CanonicalStateUpdate {
389 new_tip, changed_accounts, mined_transactions, update_kind, ..
390 } = update;
391 self.validator.on_new_head_block(new_tip);
392
393 let changed_senders = self.changed_senders(changed_accounts.into_iter());
394
395 let outcome = self.pool.write().on_canonical_state_change(
397 block_info,
398 mined_transactions,
399 changed_senders,
400 update_kind,
401 );
402
403 self.delete_discarded_blobs(outcome.discarded.iter());
405
406 self.notify_on_new_state(outcome);
408 }
409
410 pub fn update_accounts(&self, accounts: Vec<ChangedAccount>) {
414 let changed_senders = self.changed_senders(accounts.into_iter());
415 let UpdateOutcome { promoted, discarded } =
416 self.pool.write().update_accounts(changed_senders);
417 let mut listener = self.event_listener.write();
418
419 promoted.iter().for_each(|tx| listener.pending(tx.hash(), None));
420 discarded.iter().for_each(|tx| listener.discarded(tx.hash()));
421
422 self.delete_discarded_blobs(discarded.iter());
425 }
426
427 fn add_transaction(
432 &self,
433 pool: &mut RwLockWriteGuard<'_, TxPool<T>>,
434 origin: TransactionOrigin,
435 tx: TransactionValidationOutcome<T::Transaction>,
436 ) -> PoolResult<TxHash> {
437 match tx {
438 TransactionValidationOutcome::Valid {
439 balance,
440 state_nonce,
441 transaction,
442 propagate,
443 } => {
444 let sender_id = self.get_sender_id(transaction.sender());
445 let transaction_id = TransactionId::new(sender_id, transaction.nonce());
446
447 let (transaction, maybe_sidecar) = match transaction {
449 ValidTransaction::Valid(tx) => (tx, None),
450 ValidTransaction::ValidWithSidecar { transaction, sidecar } => {
451 debug_assert!(
452 transaction.is_eip4844(),
453 "validator returned sidecar for non EIP-4844 transaction"
454 );
455 (transaction, Some(sidecar))
456 }
457 };
458
459 let tx = ValidPoolTransaction {
460 transaction,
461 transaction_id,
462 propagate,
463 timestamp: Instant::now(),
464 origin,
465 };
466
467 let added = pool.add_transaction(tx, balance, state_nonce)?;
468 let hash = *added.hash();
469
470 if let Some(sidecar) = maybe_sidecar {
472 self.on_new_blob_sidecar(&hash, &sidecar);
474 self.insert_blob(hash, sidecar);
476 }
477
478 if let Some(replaced) = added.replaced_blob_transaction() {
479 debug!(target: "txpool", "[{:?}] delete replaced blob sidecar", replaced);
480 self.delete_blob(replaced);
482 }
483
484 if let Some(pending) = added.as_pending() {
486 self.on_new_pending_transaction(pending);
487 }
488
489 self.notify_event_listeners(&added);
491
492 if let Some(discarded) = added.discarded_transactions() {
493 self.delete_discarded_blobs(discarded.iter());
494 }
495
496 self.on_new_transaction(added.into_new_transaction_event());
498
499 Ok(hash)
500 }
501 TransactionValidationOutcome::Invalid(tx, err) => {
502 let mut listener = self.event_listener.write();
503 listener.discarded(tx.hash());
504 Err(PoolError::new(*tx.hash(), err))
505 }
506 TransactionValidationOutcome::Error(tx_hash, err) => {
507 let mut listener = self.event_listener.write();
508 listener.discarded(&tx_hash);
509 Err(PoolError::other(tx_hash, err))
510 }
511 }
512 }
513
514 pub fn add_transaction_and_subscribe(
516 &self,
517 origin: TransactionOrigin,
518 tx: TransactionValidationOutcome<T::Transaction>,
519 ) -> PoolResult<TransactionEvents> {
520 let listener = {
521 let mut listener = self.event_listener.write();
522 listener.subscribe(tx.tx_hash())
523 };
524 let mut results = self.add_transactions(origin, std::iter::once(tx));
525 results.pop().expect("result length is the same as the input")?;
526 Ok(listener)
527 }
528
529 pub fn add_transactions(
535 &self,
536 origin: TransactionOrigin,
537 transactions: impl IntoIterator<Item = TransactionValidationOutcome<T::Transaction>>,
538 ) -> Vec<PoolResult<TxHash>> {
539 let (mut added, discarded) = {
541 let mut pool = self.pool.write();
542 let added = transactions
543 .into_iter()
544 .map(|tx| self.add_transaction(&mut pool, origin, tx))
545 .collect::<Vec<_>>();
546
547 let discarded = if added.iter().any(Result::is_ok) {
549 pool.discard_worst()
550 } else {
551 Default::default()
552 };
553
554 (added, discarded)
555 };
556
557 if !discarded.is_empty() {
558 self.delete_discarded_blobs(discarded.iter());
560
561 let discarded_hashes =
562 discarded.into_iter().map(|tx| *tx.hash()).collect::<HashSet<_>>();
563
564 {
565 let mut listener = self.event_listener.write();
566 discarded_hashes.iter().for_each(|hash| listener.discarded(hash));
567 }
568
569 for res in &mut added {
572 if let Ok(hash) = res {
573 if discarded_hashes.contains(hash) {
574 *res = Err(PoolError::new(*hash, PoolErrorKind::DiscardedOnInsert))
575 }
576 }
577 }
578 }
579
580 added
581 }
582
583 fn on_new_pending_transaction(&self, pending: &AddedPendingTransaction<T::Transaction>) {
585 let propagate_allowed = pending.is_propagate_allowed();
586
587 let mut transaction_listeners = self.pending_transaction_listener.lock();
588 transaction_listeners.retain_mut(|listener| {
589 if listener.kind.is_propagate_only() && !propagate_allowed {
590 return !listener.sender.is_closed()
593 }
594
595 listener.send_all(pending.pending_transactions(listener.kind))
597 });
598 }
599
600 fn on_new_transaction(&self, event: NewTransactionEvent<T::Transaction>) {
602 let mut transaction_listeners = self.transaction_listener.lock();
603 transaction_listeners.retain_mut(|listener| {
604 if listener.kind.is_propagate_only() && !event.transaction.propagate {
605 return !listener.sender.is_closed()
608 }
609
610 listener.send(event.clone())
611 });
612 }
613
614 fn on_new_blob_sidecar(&self, tx_hash: &TxHash, sidecar: &BlobTransactionSidecar) {
616 let mut sidecar_listeners = self.blob_transaction_sidecar_listener.lock();
617 if sidecar_listeners.is_empty() {
618 return
619 }
620 let sidecar = Arc::new(sidecar.clone());
621 sidecar_listeners.retain_mut(|listener| {
622 let new_blob_event = NewBlobSidecar { tx_hash: *tx_hash, sidecar: sidecar.clone() };
623 match listener.sender.try_send(new_blob_event) {
624 Ok(()) => true,
625 Err(err) => {
626 if matches!(err, mpsc::error::TrySendError::Full(_)) {
627 debug!(
628 target: "txpool",
629 "[{:?}] failed to send blob sidecar; channel full",
630 sidecar,
631 );
632 true
633 } else {
634 false
635 }
636 }
637 }
638 })
639 }
640
641 fn notify_on_new_state(&self, outcome: OnNewCanonicalStateOutcome<T::Transaction>) {
643 trace!(target: "txpool", promoted=outcome.promoted.len(), discarded= outcome.discarded.len() ,"notifying listeners on state change");
644
645 self.pending_transaction_listener
648 .lock()
649 .retain_mut(|listener| listener.send_all(outcome.pending_transactions(listener.kind)));
650
651 self.transaction_listener.lock().retain_mut(|listener| {
653 listener.send_all(outcome.full_pending_transactions(listener.kind))
654 });
655
656 let OnNewCanonicalStateOutcome { mined, promoted, discarded, block_hash } = outcome;
657
658 let mut listener = self.event_listener.write();
660
661 mined.iter().for_each(|tx| listener.mined(tx, block_hash));
662 promoted.iter().for_each(|tx| listener.pending(tx.hash(), None));
663 discarded.iter().for_each(|tx| listener.discarded(tx.hash()));
664 }
665
666 fn notify_event_listeners(&self, tx: &AddedTransaction<T::Transaction>) {
668 let mut listener = self.event_listener.write();
669
670 match tx {
671 AddedTransaction::Pending(tx) => {
672 let AddedPendingTransaction { transaction, promoted, discarded, replaced } = tx;
673
674 listener.pending(transaction.hash(), replaced.clone());
675 promoted.iter().for_each(|tx| listener.pending(tx.hash(), None));
676 discarded.iter().for_each(|tx| listener.discarded(tx.hash()));
677 }
678 AddedTransaction::Parked { transaction, replaced, .. } => {
679 listener.queued(transaction.hash());
680 if let Some(replaced) = replaced {
681 listener.replaced(replaced.clone(), *transaction.hash());
682 }
683 }
684 }
685 }
686
687 pub fn best_transactions(&self) -> BestTransactions<T> {
689 self.get_pool_data().best_transactions()
690 }
691
692 pub fn best_transactions_with_attributes(
695 &self,
696 best_transactions_attributes: BestTransactionsAttributes,
697 ) -> Box<dyn crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T::Transaction>>>>
698 {
699 self.get_pool_data().best_transactions_with_attributes(best_transactions_attributes)
700 }
701
702 pub fn pending_transactions_max(
704 &self,
705 max: usize,
706 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
707 self.get_pool_data().pending_transactions_iter().take(max).collect()
708 }
709
710 pub fn pending_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
712 self.get_pool_data().pending_transactions()
713 }
714
715 pub fn queued_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
717 self.get_pool_data().queued_transactions()
718 }
719
720 pub fn all_transactions(&self) -> AllPoolTransactions<T::Transaction> {
722 let pool = self.get_pool_data();
723 AllPoolTransactions {
724 pending: pool.pending_transactions(),
725 queued: pool.queued_transactions(),
726 }
727 }
728
729 pub fn remove_transactions(
731 &self,
732 hashes: Vec<TxHash>,
733 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
734 if hashes.is_empty() {
735 return Vec::new()
736 }
737 let removed = self.pool.write().remove_transactions(hashes);
738
739 let mut listener = self.event_listener.write();
740
741 removed.iter().for_each(|tx| listener.discarded(tx.hash()));
742
743 removed
744 }
745
746 pub fn remove_transactions_and_descendants(
749 &self,
750 hashes: Vec<TxHash>,
751 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
752 if hashes.is_empty() {
753 return Vec::new()
754 }
755 let removed = self.pool.write().remove_transactions_and_descendants(hashes);
756
757 let mut listener = self.event_listener.write();
758
759 removed.iter().for_each(|tx| listener.discarded(tx.hash()));
760
761 removed
762 }
763
764 pub fn remove_transactions_by_sender(
766 &self,
767 sender: Address,
768 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
769 let sender_id = self.get_sender_id(sender);
770 let removed = self.pool.write().remove_transactions_by_sender(sender_id);
771
772 let mut listener = self.event_listener.write();
773
774 removed.iter().for_each(|tx| listener.discarded(tx.hash()));
775
776 removed
777 }
778
779 pub fn retain_unknown<A>(&self, announcement: &mut A)
781 where
782 A: HandleMempoolData,
783 {
784 if announcement.is_empty() {
785 return
786 }
787 let pool = self.get_pool_data();
788 announcement.retain_by_hash(|tx| !pool.contains(tx))
789 }
790
791 pub fn get(&self, tx_hash: &TxHash) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
793 self.get_pool_data().get(tx_hash)
794 }
795
796 pub fn get_transactions_by_sender(
798 &self,
799 sender: Address,
800 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
801 let sender_id = self.get_sender_id(sender);
802 self.get_pool_data().get_transactions_by_sender(sender_id)
803 }
804
805 pub fn get_queued_transactions_by_sender(
807 &self,
808 sender: Address,
809 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
810 let sender_id = self.get_sender_id(sender);
811 self.get_pool_data().queued_txs_by_sender(sender_id)
812 }
813
814 pub fn pending_transactions_with_predicate(
816 &self,
817 predicate: impl FnMut(&ValidPoolTransaction<T::Transaction>) -> bool,
818 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
819 self.get_pool_data().pending_transactions_with_predicate(predicate)
820 }
821
822 pub fn get_pending_transactions_by_sender(
824 &self,
825 sender: Address,
826 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
827 let sender_id = self.get_sender_id(sender);
828 self.get_pool_data().pending_txs_by_sender(sender_id)
829 }
830
831 pub fn get_highest_transaction_by_sender(
833 &self,
834 sender: Address,
835 ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
836 let sender_id = self.get_sender_id(sender);
837 self.get_pool_data().get_highest_transaction_by_sender(sender_id)
838 }
839
840 pub fn get_highest_consecutive_transaction_by_sender(
842 &self,
843 sender: Address,
844 on_chain_nonce: u64,
845 ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
846 let sender_id = self.get_sender_id(sender);
847 self.get_pool_data().get_highest_consecutive_transaction_by_sender(
848 sender_id.into_transaction_id(on_chain_nonce),
849 )
850 }
851
852 pub fn get_transaction_by_transaction_id(
854 &self,
855 transaction_id: &TransactionId,
856 ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
857 self.get_pool_data().all().get(transaction_id).map(|tx| tx.transaction.clone())
858 }
859
860 pub fn get_transactions_by_origin(
862 &self,
863 origin: TransactionOrigin,
864 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
865 self.get_pool_data()
866 .all()
867 .transactions_iter()
868 .filter(|tx| tx.origin == origin)
869 .cloned()
870 .collect()
871 }
872
873 pub fn get_pending_transactions_by_origin(
875 &self,
876 origin: TransactionOrigin,
877 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
878 self.get_pool_data().pending_transactions_iter().filter(|tx| tx.origin == origin).collect()
879 }
880
881 pub fn get_all(&self, txs: Vec<TxHash>) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
885 if txs.is_empty() {
886 return Vec::new()
887 }
888 self.get_pool_data().get_all(txs).collect()
889 }
890
891 pub fn on_propagated(&self, txs: PropagatedTransactions) {
893 if txs.0.is_empty() {
894 return
895 }
896 let mut listener = self.event_listener.write();
897
898 txs.0.into_iter().for_each(|(hash, peers)| listener.propagated(&hash, peers))
899 }
900
901 pub fn len(&self) -> usize {
903 self.get_pool_data().len()
904 }
905
906 pub fn is_empty(&self) -> bool {
908 self.get_pool_data().is_empty()
909 }
910
911 pub fn is_exceeded(&self) -> bool {
913 self.pool.read().is_exceeded()
914 }
915
916 fn insert_blob(&self, hash: TxHash, blob: BlobTransactionSidecar) {
918 debug!(target: "txpool", "[{:?}] storing blob sidecar", hash);
919 if let Err(err) = self.blob_store.insert(hash, blob) {
920 warn!(target: "txpool", %err, "[{:?}] failed to insert blob", hash);
921 self.blob_store_metrics.blobstore_failed_inserts.increment(1);
922 }
923 self.update_blob_store_metrics();
924 }
925
926 pub fn delete_blob(&self, blob: TxHash) {
928 let _ = self.blob_store.delete(blob);
929 }
930
931 pub fn delete_blobs(&self, txs: Vec<TxHash>) {
933 let _ = self.blob_store.delete_all(txs);
934 }
935
936 pub fn cleanup_blobs(&self) {
938 let stat = self.blob_store.cleanup();
939 self.blob_store_metrics.blobstore_failed_deletes.increment(stat.delete_failed as u64);
940 self.update_blob_store_metrics();
941 }
942
943 fn update_blob_store_metrics(&self) {
944 if let Some(data_size) = self.blob_store.data_size_hint() {
945 self.blob_store_metrics.blobstore_byte_size.set(data_size as f64);
946 }
947 self.blob_store_metrics.blobstore_entries.set(self.blob_store.blobs_len() as f64);
948 }
949
950 fn delete_discarded_blobs<'a>(
952 &'a self,
953 transactions: impl IntoIterator<Item = &'a Arc<ValidPoolTransaction<T::Transaction>>>,
954 ) {
955 let blob_txs = transactions
956 .into_iter()
957 .filter(|tx| tx.transaction.is_eip4844())
958 .map(|tx| *tx.hash())
959 .collect();
960 self.delete_blobs(blob_txs);
961 }
962}
963
964impl<V, T: TransactionOrdering, S> fmt::Debug for PoolInner<V, T, S> {
965 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
966 f.debug_struct("PoolInner").field("config", &self.config).finish_non_exhaustive()
967 }
968}
969
970#[derive(Debug)]
972struct PendingTransactionHashListener {
973 sender: mpsc::Sender<TxHash>,
974 kind: TransactionListenerKind,
976}
977
978impl PendingTransactionHashListener {
979 fn send_all(&self, hashes: impl IntoIterator<Item = TxHash>) -> bool {
983 for tx_hash in hashes {
984 match self.sender.try_send(tx_hash) {
985 Ok(()) => {}
986 Err(err) => {
987 return if matches!(err, mpsc::error::TrySendError::Full(_)) {
988 debug!(
989 target: "txpool",
990 "[{:?}] failed to send pending tx; channel full",
991 tx_hash,
992 );
993 true
994 } else {
995 false
996 }
997 }
998 }
999 }
1000 true
1001 }
1002}
1003
1004#[derive(Debug)]
1006struct TransactionListener<T: PoolTransaction> {
1007 sender: mpsc::Sender<NewTransactionEvent<T>>,
1008 kind: TransactionListenerKind,
1010}
1011
1012impl<T: PoolTransaction> TransactionListener<T> {
1013 fn send(&self, event: NewTransactionEvent<T>) -> bool {
1017 self.send_all(std::iter::once(event))
1018 }
1019
1020 fn send_all(&self, events: impl IntoIterator<Item = NewTransactionEvent<T>>) -> bool {
1024 for event in events {
1025 match self.sender.try_send(event) {
1026 Ok(()) => {}
1027 Err(err) => {
1028 return if let mpsc::error::TrySendError::Full(event) = err {
1029 debug!(
1030 target: "txpool",
1031 "[{:?}] failed to send pending tx; channel full",
1032 event.transaction.hash(),
1033 );
1034 true
1035 } else {
1036 false
1037 }
1038 }
1039 }
1040 }
1041 true
1042 }
1043}
1044
1045#[derive(Debug)]
1047struct BlobTransactionSidecarListener {
1048 sender: mpsc::Sender<NewBlobSidecar>,
1049}
1050
1051#[derive(Debug, Clone)]
1053pub struct AddedPendingTransaction<T: PoolTransaction> {
1054 transaction: Arc<ValidPoolTransaction<T>>,
1056 replaced: Option<Arc<ValidPoolTransaction<T>>>,
1058 promoted: Vec<Arc<ValidPoolTransaction<T>>>,
1060 discarded: Vec<Arc<ValidPoolTransaction<T>>>,
1062}
1063
1064impl<T: PoolTransaction> AddedPendingTransaction<T> {
1065 pub(crate) fn pending_transactions(
1071 &self,
1072 kind: TransactionListenerKind,
1073 ) -> impl Iterator<Item = B256> + '_ {
1074 let iter = std::iter::once(&self.transaction).chain(self.promoted.iter());
1075 PendingTransactionIter { kind, iter }
1076 }
1077
1078 pub(crate) fn is_propagate_allowed(&self) -> bool {
1080 self.transaction.propagate
1081 }
1082}
1083
1084pub(crate) struct PendingTransactionIter<Iter> {
1085 kind: TransactionListenerKind,
1086 iter: Iter,
1087}
1088
1089impl<'a, Iter, T> Iterator for PendingTransactionIter<Iter>
1090where
1091 Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1092 T: PoolTransaction + 'a,
1093{
1094 type Item = B256;
1095
1096 fn next(&mut self) -> Option<Self::Item> {
1097 loop {
1098 let next = self.iter.next()?;
1099 if self.kind.is_propagate_only() && !next.propagate {
1100 continue
1101 }
1102 return Some(*next.hash())
1103 }
1104 }
1105}
1106
1107pub(crate) struct FullPendingTransactionIter<Iter> {
1109 kind: TransactionListenerKind,
1110 iter: Iter,
1111}
1112
1113impl<'a, Iter, T> Iterator for FullPendingTransactionIter<Iter>
1114where
1115 Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1116 T: PoolTransaction + 'a,
1117{
1118 type Item = NewTransactionEvent<T>;
1119
1120 fn next(&mut self) -> Option<Self::Item> {
1121 loop {
1122 let next = self.iter.next()?;
1123 if self.kind.is_propagate_only() && !next.propagate {
1124 continue
1125 }
1126 return Some(NewTransactionEvent {
1127 subpool: SubPool::Pending,
1128 transaction: next.clone(),
1129 })
1130 }
1131 }
1132}
1133
1134#[derive(Debug, Clone)]
1136pub enum AddedTransaction<T: PoolTransaction> {
1137 Pending(AddedPendingTransaction<T>),
1139 Parked {
1142 transaction: Arc<ValidPoolTransaction<T>>,
1144 replaced: Option<Arc<ValidPoolTransaction<T>>>,
1146 subpool: SubPool,
1148 },
1149}
1150
1151impl<T: PoolTransaction> AddedTransaction<T> {
1152 pub(crate) const fn as_pending(&self) -> Option<&AddedPendingTransaction<T>> {
1154 match self {
1155 Self::Pending(tx) => Some(tx),
1156 _ => None,
1157 }
1158 }
1159
1160 pub(crate) const fn replaced(&self) -> Option<&Arc<ValidPoolTransaction<T>>> {
1162 match self {
1163 Self::Pending(tx) => tx.replaced.as_ref(),
1164 Self::Parked { replaced, .. } => replaced.as_ref(),
1165 }
1166 }
1167
1168 pub(crate) fn discarded_transactions(&self) -> Option<&[Arc<ValidPoolTransaction<T>>]> {
1170 match self {
1171 Self::Pending(tx) => Some(&tx.discarded),
1172 Self::Parked { .. } => None,
1173 }
1174 }
1175
1176 pub(crate) fn replaced_blob_transaction(&self) -> Option<B256> {
1178 self.replaced().filter(|tx| tx.transaction.is_eip4844()).map(|tx| *tx.transaction.hash())
1179 }
1180
1181 pub(crate) fn hash(&self) -> &TxHash {
1183 match self {
1184 Self::Pending(tx) => tx.transaction.hash(),
1185 Self::Parked { transaction, .. } => transaction.hash(),
1186 }
1187 }
1188
1189 pub(crate) fn into_new_transaction_event(self) -> NewTransactionEvent<T> {
1191 match self {
1192 Self::Pending(tx) => {
1193 NewTransactionEvent { subpool: SubPool::Pending, transaction: tx.transaction }
1194 }
1195 Self::Parked { transaction, subpool, .. } => {
1196 NewTransactionEvent { transaction, subpool }
1197 }
1198 }
1199 }
1200
1201 #[cfg(test)]
1203 pub(crate) const fn subpool(&self) -> SubPool {
1204 match self {
1205 Self::Pending(_) => SubPool::Pending,
1206 Self::Parked { subpool, .. } => *subpool,
1207 }
1208 }
1209
1210 #[cfg(test)]
1212 pub(crate) fn id(&self) -> &TransactionId {
1213 match self {
1214 Self::Pending(added) => added.transaction.id(),
1215 Self::Parked { transaction, .. } => transaction.id(),
1216 }
1217 }
1218}
1219
1220#[derive(Debug)]
1222pub(crate) struct OnNewCanonicalStateOutcome<T: PoolTransaction> {
1223 pub(crate) block_hash: B256,
1225 pub(crate) mined: Vec<TxHash>,
1227 pub(crate) promoted: Vec<Arc<ValidPoolTransaction<T>>>,
1229 pub(crate) discarded: Vec<Arc<ValidPoolTransaction<T>>>,
1231}
1232
1233impl<T: PoolTransaction> OnNewCanonicalStateOutcome<T> {
1234 pub(crate) fn pending_transactions(
1240 &self,
1241 kind: TransactionListenerKind,
1242 ) -> impl Iterator<Item = B256> + '_ {
1243 let iter = self.promoted.iter();
1244 PendingTransactionIter { kind, iter }
1245 }
1246
1247 pub(crate) fn full_pending_transactions(
1253 &self,
1254 kind: TransactionListenerKind,
1255 ) -> impl Iterator<Item = NewTransactionEvent<T>> + '_ {
1256 let iter = self.promoted.iter();
1257 FullPendingTransactionIter { kind, iter }
1258 }
1259}
1260
1261#[cfg(test)]
1262mod tests {
1263 use crate::{
1264 blobstore::{BlobStore, InMemoryBlobStore},
1265 test_utils::{MockTransaction, TestPoolBuilder},
1266 validate::ValidTransaction,
1267 BlockInfo, PoolConfig, SubPoolLimit, TransactionOrigin, TransactionValidationOutcome, U256,
1268 };
1269 use alloy_eips::eip4844::BlobTransactionSidecar;
1270 use std::{fs, path::PathBuf};
1271
1272 #[test]
1273 fn test_discard_blobs_on_blob_tx_eviction() {
1274 let blobs = {
1275 let json_content = fs::read_to_string(
1277 PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test_data/blob1.json"),
1278 )
1279 .expect("Failed to read the blob data file");
1280
1281 let json_value: serde_json::Value =
1283 serde_json::from_str(&json_content).expect("Failed to deserialize JSON");
1284
1285 vec![
1287 json_value
1289 .get("data")
1290 .unwrap()
1291 .as_str()
1292 .expect("Data is not a valid string")
1293 .to_string(),
1294 ]
1295 };
1296
1297 let sidecar = BlobTransactionSidecar::try_from_blobs_hex(blobs).unwrap();
1299
1300 let blob_limit = SubPoolLimit::new(1000, usize::MAX);
1302
1303 let test_pool = &TestPoolBuilder::default()
1305 .with_config(PoolConfig { blob_limit, ..Default::default() })
1306 .pool;
1307
1308 test_pool
1310 .set_block_info(BlockInfo { pending_blob_fee: Some(10_000_000), ..Default::default() });
1311
1312 let blob_store = InMemoryBlobStore::default();
1314
1315 for n in 0..blob_limit.max_txs + 10 {
1317 let mut tx = MockTransaction::eip4844_with_sidecar(sidecar.clone());
1319
1320 tx.set_size(1844674407370951);
1322
1323 if n < blob_limit.max_txs {
1325 blob_store.insert(*tx.get_hash(), sidecar.clone()).unwrap();
1326 }
1327
1328 test_pool.add_transactions(
1330 TransactionOrigin::External,
1331 [TransactionValidationOutcome::Valid {
1332 balance: U256::from(1_000),
1333 state_nonce: 0,
1334 transaction: ValidTransaction::ValidWithSidecar {
1335 transaction: tx,
1336 sidecar: sidecar.clone(),
1337 },
1338 propagate: true,
1339 }],
1340 );
1341 }
1342
1343 assert_eq!(test_pool.size().blob, blob_limit.max_txs);
1345
1346 assert_eq!(test_pool.size().blob_size, 1844674407370951000);
1348
1349 assert_eq!(*test_pool.blob_store(), blob_store);
1351 }
1352}