1use crate::{
69 blobstore::BlobStore,
70 error::{PoolError, PoolErrorKind, PoolResult},
71 identifier::{SenderId, SenderIdentifiers, TransactionId},
72 metrics::BlobStoreMetrics,
73 pool::{
74 listener::{
75 BlobTransactionSidecarListener, PendingTransactionHashListener, PoolEventBroadcast,
76 TransactionListener,
77 },
78 state::SubPool,
79 txpool::{SenderInfo, TxPool},
80 update::UpdateOutcome,
81 },
82 traits::{
83 AllPoolTransactions, BestTransactionsAttributes, BlockInfo, GetPooledTransactionLimit,
84 NewBlobSidecar, PoolSize, PoolTransaction, PropagatedTransactions, TransactionOrigin,
85 },
86 validate::{TransactionValidationOutcome, ValidPoolTransaction, ValidTransaction},
87 CanonicalStateUpdate, EthPoolTransaction, PoolConfig, TransactionOrdering,
88 TransactionValidator,
89};
90
91use alloy_primitives::{Address, TxHash, B256};
92use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
93use reth_eth_wire_types::HandleMempoolData;
94use reth_execution_types::ChangedAccount;
95
96use alloy_eips::{eip7594::BlobTransactionSidecarVariant, Typed2718};
97use reth_primitives_traits::Recovered;
98use rustc_hash::FxHashMap;
99use std::{
100 collections::HashSet,
101 fmt,
102 sync::{
103 atomic::{AtomicBool, Ordering},
104 Arc,
105 },
106 time::Instant,
107};
108use tokio::sync::mpsc;
109use tracing::{debug, trace, warn};
110mod events;
111pub use best::{BestTransactionFilter, BestTransactionsWithPrioritizedSenders};
112pub use blob::{blob_tx_priority, fee_delta, BlobOrd, BlobTransactions};
113pub use events::{FullTransactionEvent, NewTransactionEvent, TransactionEvent};
114pub use listener::{AllTransactionsEvents, TransactionEvents, TransactionListenerKind};
115pub use parked::{BasefeeOrd, ParkedOrd, ParkedPool, QueuedOrd};
116pub use pending::PendingPool;
117use reth_primitives_traits::Block;
118
119mod best;
120pub use best::BestTransactions;
121
122mod blob;
123pub mod listener;
124mod parked;
125pub mod pending;
126pub mod size;
127pub(crate) mod state;
128pub mod txpool;
129mod update;
130
131pub const PENDING_TX_LISTENER_BUFFER_SIZE: usize = 2048;
133pub const NEW_TX_LISTENER_BUFFER_SIZE: usize = 1024;
135
136const BLOB_SIDECAR_LISTENER_BUFFER_SIZE: usize = 512;
137
138pub struct PoolInner<V, T, S>
140where
141 T: TransactionOrdering,
142{
143 identifiers: RwLock<SenderIdentifiers>,
145 validator: V,
147 blob_store: S,
149 pool: RwLock<TxPool<T>>,
151 config: PoolConfig,
153 event_listener: RwLock<PoolEventBroadcast<T::Transaction>>,
155 has_event_listeners: AtomicBool,
157 pending_transaction_listener: RwLock<Vec<PendingTransactionHashListener>>,
159 transaction_listener: RwLock<Vec<TransactionListener<T::Transaction>>>,
161 blob_transaction_sidecar_listener: Mutex<Vec<BlobTransactionSidecarListener>>,
163 blob_store_metrics: BlobStoreMetrics,
165}
166
167impl<V, T, S> PoolInner<V, T, S>
170where
171 V: TransactionValidator,
172 T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
173 S: BlobStore,
174{
175 pub fn new(validator: V, ordering: T, blob_store: S, config: PoolConfig) -> Self {
177 Self {
178 identifiers: Default::default(),
179 validator,
180 event_listener: Default::default(),
181 has_event_listeners: AtomicBool::new(false),
182 pool: RwLock::new(TxPool::new(ordering, config.clone())),
183 pending_transaction_listener: Default::default(),
184 transaction_listener: Default::default(),
185 blob_transaction_sidecar_listener: Default::default(),
186 config,
187 blob_store,
188 blob_store_metrics: Default::default(),
189 }
190 }
191
192 pub const fn blob_store(&self) -> &S {
194 &self.blob_store
195 }
196
197 pub fn size(&self) -> PoolSize {
199 self.get_pool_data().size()
200 }
201
202 pub fn block_info(&self) -> BlockInfo {
204 self.get_pool_data().block_info()
205 }
206 pub fn set_block_info(&self, info: BlockInfo) {
208 self.pool.write().set_block_info(info)
209 }
210
211 pub fn get_sender_id(&self, addr: Address) -> SenderId {
213 self.identifiers.write().sender_id_or_create(addr)
214 }
215
216 pub fn get_sender_ids(&self, addrs: impl IntoIterator<Item = Address>) -> Vec<SenderId> {
218 self.identifiers.write().sender_ids_or_create(addrs)
219 }
220
221 pub fn unique_senders(&self) -> HashSet<Address> {
223 self.get_pool_data().unique_senders()
224 }
225
226 fn changed_senders(
229 &self,
230 accs: impl Iterator<Item = ChangedAccount>,
231 ) -> FxHashMap<SenderId, SenderInfo> {
232 let identifiers = self.identifiers.read();
233 accs.into_iter()
234 .filter_map(|acc| {
235 let ChangedAccount { address, nonce, balance } = acc;
236 let sender_id = identifiers.sender_id(&address)?;
237 Some((sender_id, SenderInfo { state_nonce: nonce, balance }))
238 })
239 .collect()
240 }
241
242 pub const fn config(&self) -> &PoolConfig {
244 &self.config
245 }
246
247 pub const fn validator(&self) -> &V {
249 &self.validator
250 }
251
252 pub fn add_pending_listener(&self, kind: TransactionListenerKind) -> mpsc::Receiver<TxHash> {
255 let (sender, rx) = mpsc::channel(self.config.pending_tx_listener_buffer_size);
256 let listener = PendingTransactionHashListener { sender, kind };
257
258 let mut listeners = self.pending_transaction_listener.write();
259 listeners.retain(|l| !l.sender.is_closed());
261 listeners.push(listener);
262
263 rx
264 }
265
266 pub fn add_new_transaction_listener(
268 &self,
269 kind: TransactionListenerKind,
270 ) -> mpsc::Receiver<NewTransactionEvent<T::Transaction>> {
271 let (sender, rx) = mpsc::channel(self.config.new_tx_listener_buffer_size);
272 let listener = TransactionListener { sender, kind };
273
274 let mut listeners = self.transaction_listener.write();
275 listeners.retain(|l| !l.sender.is_closed());
277 listeners.push(listener);
278
279 rx
280 }
281 pub fn add_blob_sidecar_listener(&self) -> mpsc::Receiver<NewBlobSidecar> {
284 let (sender, rx) = mpsc::channel(BLOB_SIDECAR_LISTENER_BUFFER_SIZE);
285 let listener = BlobTransactionSidecarListener { sender };
286 self.blob_transaction_sidecar_listener.lock().push(listener);
287 rx
288 }
289
290 pub fn add_transaction_event_listener(&self, tx_hash: TxHash) -> Option<TransactionEvents> {
293 if !self.get_pool_data().contains(&tx_hash) {
294 return None
295 }
296 let mut listener = self.event_listener.write();
297 let events = listener.subscribe(tx_hash);
298 self.mark_event_listener_installed();
299 Some(events)
300 }
301
302 pub fn add_all_transactions_event_listener(&self) -> AllTransactionsEvents<T::Transaction> {
304 let mut listener = self.event_listener.write();
305 let events = listener.subscribe_all();
306 self.mark_event_listener_installed();
307 events
308 }
309
310 #[inline]
311 fn has_event_listeners(&self) -> bool {
312 self.has_event_listeners.load(Ordering::Relaxed)
313 }
314
315 #[inline]
316 fn mark_event_listener_installed(&self) {
317 self.has_event_listeners.store(true, Ordering::Relaxed);
318 }
319
320 #[inline]
321 fn update_event_listener_state(&self, listener: &PoolEventBroadcast<T::Transaction>) {
322 if listener.is_empty() {
323 self.has_event_listeners.store(false, Ordering::Relaxed);
324 }
325 }
326
327 #[inline]
328 fn with_event_listener<F>(&self, emit: F)
329 where
330 F: FnOnce(&mut PoolEventBroadcast<T::Transaction>),
331 {
332 if !self.has_event_listeners() {
333 return
334 }
335 let mut listener = self.event_listener.write();
336 if !listener.is_empty() {
337 emit(&mut listener);
338 }
339 self.update_event_listener_state(&listener);
340 }
341
342 pub fn get_pool_data(&self) -> RwLockReadGuard<'_, TxPool<T>> {
344 self.pool.read()
345 }
346
347 pub fn pooled_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
349 let mut out = Vec::new();
350 self.append_pooled_transactions(&mut out);
351 out
352 }
353
354 pub fn pooled_transactions_hashes(&self) -> Vec<TxHash> {
356 let mut out = Vec::new();
357 self.append_pooled_transactions_hashes(&mut out);
358 out
359 }
360
361 pub fn pooled_transactions_max(
363 &self,
364 max: usize,
365 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
366 let mut out = Vec::new();
367 self.append_pooled_transactions_max(max, &mut out);
368 out
369 }
370
371 pub fn append_pooled_transactions(
373 &self,
374 out: &mut Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
375 ) {
376 out.extend(
377 self.get_pool_data().all().transactions_iter().filter(|tx| tx.propagate).cloned(),
378 );
379 }
380
381 pub fn append_pooled_transaction_elements(
384 &self,
385 tx_hashes: &[TxHash],
386 limit: GetPooledTransactionLimit,
387 out: &mut Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>,
388 ) where
389 <V as TransactionValidator>::Transaction: EthPoolTransaction,
390 {
391 let transactions = self.get_all_propagatable(tx_hashes);
392 let mut size = 0;
393 for transaction in transactions {
394 let encoded_len = transaction.encoded_length();
395 let Some(pooled) = self.to_pooled_transaction(transaction) else {
396 continue;
397 };
398
399 size += encoded_len;
400 out.push(pooled.into_inner());
401
402 if limit.exceeds(size) {
403 break
404 }
405 }
406 }
407
408 pub fn append_pooled_transactions_hashes(&self, out: &mut Vec<TxHash>) {
411 out.extend(
412 self.get_pool_data()
413 .all()
414 .transactions_iter()
415 .filter(|tx| tx.propagate)
416 .map(|tx| *tx.hash()),
417 );
418 }
419
420 pub fn append_pooled_transactions_max(
423 &self,
424 max: usize,
425 out: &mut Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
426 ) {
427 out.extend(
428 self.get_pool_data()
429 .all()
430 .transactions_iter()
431 .filter(|tx| tx.propagate)
432 .take(max)
433 .cloned(),
434 );
435 }
436
437 pub fn pooled_transactions_hashes_max(&self, max: usize) -> Vec<TxHash> {
439 if max == 0 {
440 return Vec::new();
441 }
442 self.get_pool_data()
443 .all()
444 .transactions_iter()
445 .filter(|tx| tx.propagate)
446 .take(max)
447 .map(|tx| *tx.hash())
448 .collect()
449 }
450
451 fn to_pooled_transaction(
456 &self,
457 transaction: Arc<ValidPoolTransaction<T::Transaction>>,
458 ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
459 where
460 <V as TransactionValidator>::Transaction: EthPoolTransaction,
461 {
462 if transaction.is_eip4844() {
463 let sidecar = self.blob_store.get(*transaction.hash()).ok()??;
464 transaction.transaction.clone().try_into_pooled_eip4844(sidecar)
465 } else {
466 transaction
467 .transaction
468 .clone_into_pooled()
469 .inspect_err(|err| {
470 debug!(
471 target: "txpool", %err,
472 "failed to convert transaction to pooled element; skipping",
473 );
474 })
475 .ok()
476 }
477 }
478
479 pub fn get_pooled_transaction_elements(
482 &self,
483 tx_hashes: Vec<TxHash>,
484 limit: GetPooledTransactionLimit,
485 ) -> Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>
486 where
487 <V as TransactionValidator>::Transaction: EthPoolTransaction,
488 {
489 let mut elements = Vec::new();
490 self.append_pooled_transaction_elements(&tx_hashes, limit, &mut elements);
491 elements.shrink_to_fit();
492 elements
493 }
494
495 pub fn get_pooled_transaction_element(
497 &self,
498 tx_hash: TxHash,
499 ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
500 where
501 <V as TransactionValidator>::Transaction: EthPoolTransaction,
502 {
503 self.get(&tx_hash).and_then(|tx| self.to_pooled_transaction(tx))
504 }
505
506 pub fn on_canonical_state_change<B>(&self, update: CanonicalStateUpdate<'_, B>)
508 where
509 B: Block,
510 {
511 trace!(target: "txpool", ?update, "updating pool on canonical state change");
512
513 let block_info = update.block_info();
514 let CanonicalStateUpdate {
515 new_tip, changed_accounts, mined_transactions, update_kind, ..
516 } = update;
517 self.validator.on_new_head_block(new_tip);
518
519 let changed_senders = self.changed_senders(changed_accounts.into_iter());
520
521 let outcome = self.pool.write().on_canonical_state_change(
523 block_info,
524 mined_transactions,
525 changed_senders,
526 update_kind,
527 );
528
529 self.delete_discarded_blobs(outcome.discarded.iter());
531
532 self.notify_on_new_state(outcome);
534 }
535
536 pub fn update_accounts(&self, accounts: Vec<ChangedAccount>) {
542 let changed_senders = self.changed_senders(accounts.into_iter());
543 let UpdateOutcome { promoted, discarded } =
544 self.pool.write().update_accounts(changed_senders);
545
546 self.notify_on_transaction_updates(promoted, discarded);
547 }
548
549 fn add_transaction(
557 &self,
558 pool: &mut RwLockWriteGuard<'_, TxPool<T>>,
559 origin: TransactionOrigin,
560 tx: TransactionValidationOutcome<T::Transaction>,
561 ) -> (PoolResult<AddedTransactionOutcome>, Option<AddedTransactionMeta<T::Transaction>>) {
562 match tx {
563 TransactionValidationOutcome::Valid {
564 balance,
565 state_nonce,
566 transaction,
567 propagate,
568 bytecode_hash,
569 authorities,
570 } => {
571 let sender_id = self.get_sender_id(transaction.sender());
572 let transaction_id = TransactionId::new(sender_id, transaction.nonce());
573
574 let (transaction, blob_sidecar) = match transaction {
576 ValidTransaction::Valid(tx) => (tx, None),
577 ValidTransaction::ValidWithSidecar { transaction, sidecar } => {
578 debug_assert!(
579 transaction.is_eip4844(),
580 "validator returned sidecar for non EIP-4844 transaction"
581 );
582 (transaction, Some(sidecar))
583 }
584 };
585
586 let tx = ValidPoolTransaction {
587 transaction,
588 transaction_id,
589 propagate,
590 timestamp: Instant::now(),
591 origin,
592 authority_ids: authorities.map(|auths| self.get_sender_ids(auths)),
593 };
594
595 let added = match pool.add_transaction(tx, balance, state_nonce, bytecode_hash) {
596 Ok(added) => added,
597 Err(err) => return (Err(err), None),
598 };
599 let hash = *added.hash();
600 let state = added.transaction_state();
601
602 let meta = AddedTransactionMeta { added, blob_sidecar };
603
604 (Ok(AddedTransactionOutcome { hash, state }), Some(meta))
605 }
606 TransactionValidationOutcome::Invalid(tx, err) => {
607 self.with_event_listener(|listener| listener.invalid(tx.hash()));
608 (Err(PoolError::new(*tx.hash(), err)), None)
609 }
610 TransactionValidationOutcome::Error(tx_hash, err) => {
611 self.with_event_listener(|listener| listener.discarded(&tx_hash));
612 (Err(PoolError::other(tx_hash, err)), None)
613 }
614 }
615 }
616
617 pub fn add_transaction_and_subscribe(
619 &self,
620 origin: TransactionOrigin,
621 tx: TransactionValidationOutcome<T::Transaction>,
622 ) -> PoolResult<TransactionEvents> {
623 let listener = {
624 let mut listener = self.event_listener.write();
625 let events = listener.subscribe(tx.tx_hash());
626 self.mark_event_listener_installed();
627 events
628 };
629 let mut results = self.add_transactions(origin, std::iter::once(tx));
630 results.pop().expect("result length is the same as the input")?;
631 Ok(listener)
632 }
633
634 pub fn add_transactions(
636 &self,
637 origin: TransactionOrigin,
638 transactions: impl IntoIterator<Item = TransactionValidationOutcome<T::Transaction>>,
639 ) -> Vec<PoolResult<AddedTransactionOutcome>> {
640 let (mut results, added_metas, discarded) = {
642 let mut pool = self.pool.write();
643 let mut added_metas = Vec::new();
644
645 let results = transactions
646 .into_iter()
647 .map(|tx| {
648 let (result, meta) = self.add_transaction(&mut pool, origin, tx);
649
650 if result.is_ok() &&
652 let Some(meta) = meta
653 {
654 added_metas.push(meta);
655 }
656
657 result
658 })
659 .collect::<Vec<_>>();
660
661 let discarded = if results.iter().any(Result::is_ok) {
663 pool.discard_worst()
664 } else {
665 Default::default()
666 };
667
668 (results, added_metas, discarded)
669 };
670
671 for meta in added_metas {
672 self.on_added_transaction(meta);
673 }
674
675 if !discarded.is_empty() {
676 self.delete_discarded_blobs(discarded.iter());
678 self.with_event_listener(|listener| listener.discarded_many(&discarded));
679
680 let discarded_hashes =
681 discarded.into_iter().map(|tx| *tx.hash()).collect::<HashSet<_>>();
682
683 for res in &mut results {
686 if let Ok(AddedTransactionOutcome { hash, .. }) = res &&
687 discarded_hashes.contains(hash)
688 {
689 *res = Err(PoolError::new(*hash, PoolErrorKind::DiscardedOnInsert))
690 }
691 }
692 };
693
694 results
695 }
696
697 fn on_added_transaction(&self, meta: AddedTransactionMeta<T::Transaction>) {
702 if let Some(sidecar) = meta.blob_sidecar {
704 let hash = *meta.added.hash();
705 self.on_new_blob_sidecar(&hash, &sidecar);
706 self.insert_blob(hash, sidecar);
707 }
708
709 if let Some(replaced) = meta.added.replaced_blob_transaction() {
711 debug!(target: "txpool", "[{:?}] delete replaced blob sidecar", replaced);
712 self.delete_blob(replaced);
713 }
714
715 if let Some(discarded) = meta.added.discarded_transactions() {
717 self.delete_discarded_blobs(discarded.iter());
718 }
719
720 if let Some(pending) = meta.added.as_pending() {
722 self.on_new_pending_transaction(pending);
723 }
724
725 self.notify_event_listeners(&meta.added);
727
728 self.on_new_transaction(meta.added.into_new_transaction_event());
730 }
731
732 pub fn on_new_pending_transaction(&self, pending: &AddedPendingTransaction<T::Transaction>) {
741 let mut needs_cleanup = false;
742
743 {
744 let listeners = self.pending_transaction_listener.read();
745 for listener in listeners.iter() {
746 if !listener.send_all(pending.pending_transactions(listener.kind)) {
747 needs_cleanup = true;
748 }
749 }
750 }
751
752 if needs_cleanup {
754 self.pending_transaction_listener
755 .write()
756 .retain(|listener| !listener.sender.is_closed());
757 }
758 }
759
760 pub fn on_new_transaction(&self, event: NewTransactionEvent<T::Transaction>) {
769 let mut needs_cleanup = false;
770
771 {
772 let listeners = self.transaction_listener.read();
773 for listener in listeners.iter() {
774 if listener.kind.is_propagate_only() && !event.transaction.propagate {
775 if listener.sender.is_closed() {
776 needs_cleanup = true;
777 }
778 continue
780 }
781
782 if !listener.send(event.clone()) {
783 needs_cleanup = true;
784 }
785 }
786 }
787
788 if needs_cleanup {
790 self.transaction_listener.write().retain(|listener| !listener.sender.is_closed());
791 }
792 }
793
794 fn on_new_blob_sidecar(&self, tx_hash: &TxHash, sidecar: &BlobTransactionSidecarVariant) {
796 let mut sidecar_listeners = self.blob_transaction_sidecar_listener.lock();
797 if sidecar_listeners.is_empty() {
798 return
799 }
800 let sidecar = Arc::new(sidecar.clone());
801 sidecar_listeners.retain_mut(|listener| {
802 let new_blob_event = NewBlobSidecar { tx_hash: *tx_hash, sidecar: sidecar.clone() };
803 match listener.sender.try_send(new_blob_event) {
804 Ok(()) => true,
805 Err(err) => {
806 if matches!(err, mpsc::error::TrySendError::Full(_)) {
807 debug!(
808 target: "txpool",
809 "[{:?}] failed to send blob sidecar; channel full",
810 sidecar,
811 );
812 true
813 } else {
814 false
815 }
816 }
817 }
818 })
819 }
820
821 fn notify_on_new_state(&self, outcome: OnNewCanonicalStateOutcome<T::Transaction>) {
823 trace!(target: "txpool", promoted=outcome.promoted.len(), discarded= outcome.discarded.len() ,"notifying listeners on state change");
824
825 let mut needs_pending_cleanup = false;
827 {
828 let listeners = self.pending_transaction_listener.read();
829 for listener in listeners.iter() {
830 if !listener.send_all(outcome.pending_transactions(listener.kind)) {
831 needs_pending_cleanup = true;
832 }
833 }
834 }
835 if needs_pending_cleanup {
836 self.pending_transaction_listener.write().retain(|l| !l.sender.is_closed());
837 }
838
839 let mut needs_tx_cleanup = false;
841 {
842 let listeners = self.transaction_listener.read();
843 for listener in listeners.iter() {
844 if !listener.send_all(outcome.full_pending_transactions(listener.kind)) {
845 needs_tx_cleanup = true;
846 }
847 }
848 }
849 if needs_tx_cleanup {
850 self.transaction_listener.write().retain(|l| !l.sender.is_closed());
851 }
852
853 let OnNewCanonicalStateOutcome { mined, promoted, discarded, block_hash } = outcome;
854
855 self.with_event_listener(|listener| {
857 for tx in &mined {
858 listener.mined(tx, block_hash);
859 }
860 for tx in &promoted {
861 listener.pending(tx.hash(), None);
862 }
863 for tx in &discarded {
864 listener.discarded(tx.hash());
865 }
866 })
867 }
868
869 #[allow(clippy::type_complexity)]
878 pub fn notify_on_transaction_updates(
879 &self,
880 promoted: Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
881 discarded: Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
882 ) {
883 if !promoted.is_empty() {
885 let mut needs_pending_cleanup = false;
886 {
887 let listeners = self.pending_transaction_listener.read();
888 for listener in listeners.iter() {
889 let promoted_hashes = promoted.iter().filter_map(|tx| {
890 if listener.kind.is_propagate_only() && !tx.propagate {
891 None
892 } else {
893 Some(*tx.hash())
894 }
895 });
896 if !listener.send_all(promoted_hashes) {
897 needs_pending_cleanup = true;
898 }
899 }
900 }
901 if needs_pending_cleanup {
902 self.pending_transaction_listener.write().retain(|l| !l.sender.is_closed());
903 }
904
905 let mut needs_tx_cleanup = false;
907 {
908 let listeners = self.transaction_listener.read();
909 for listener in listeners.iter() {
910 let promoted_txs = promoted.iter().filter_map(|tx| {
911 if listener.kind.is_propagate_only() && !tx.propagate {
912 None
913 } else {
914 Some(NewTransactionEvent::pending(tx.clone()))
915 }
916 });
917 if !listener.send_all(promoted_txs) {
918 needs_tx_cleanup = true;
919 }
920 }
921 }
922 if needs_tx_cleanup {
923 self.transaction_listener.write().retain(|l| !l.sender.is_closed());
924 }
925 }
926
927 self.with_event_listener(|listener| {
928 for tx in &promoted {
929 listener.pending(tx.hash(), None);
930 }
931 for tx in &discarded {
932 listener.discarded(tx.hash());
933 }
934 });
935
936 if !discarded.is_empty() {
937 self.delete_discarded_blobs(discarded.iter());
940 }
941 }
942
943 pub fn notify_event_listeners(&self, tx: &AddedTransaction<T::Transaction>) {
952 self.with_event_listener(|listener| match tx {
953 AddedTransaction::Pending(tx) => {
954 let AddedPendingTransaction { transaction, promoted, discarded, replaced } = tx;
955
956 listener.pending(transaction.hash(), replaced.clone());
957 for tx in promoted {
958 listener.pending(tx.hash(), None);
959 }
960 for tx in discarded {
961 listener.discarded(tx.hash());
962 }
963 }
964 AddedTransaction::Parked { transaction, replaced, queued_reason, .. } => {
965 listener.queued(transaction.hash(), queued_reason.clone());
966 if let Some(replaced) = replaced {
967 listener.replaced(replaced.clone(), *transaction.hash());
968 }
969 }
970 });
971 }
972
973 pub fn best_transactions(&self) -> BestTransactions<T> {
975 self.get_pool_data().best_transactions()
976 }
977
978 pub fn best_transactions_with_attributes(
981 &self,
982 best_transactions_attributes: BestTransactionsAttributes,
983 ) -> Box<dyn crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T::Transaction>>>>
984 {
985 self.get_pool_data().best_transactions_with_attributes(best_transactions_attributes)
986 }
987
988 pub fn pending_transactions_max(
990 &self,
991 max: usize,
992 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
993 self.get_pool_data().pending_transactions_iter().take(max).collect()
994 }
995
996 pub fn pending_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
998 self.get_pool_data().pending_transactions()
999 }
1000
1001 pub fn queued_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1003 self.get_pool_data().queued_transactions()
1004 }
1005
1006 pub fn all_transactions(&self) -> AllPoolTransactions<T::Transaction> {
1008 let pool = self.get_pool_data();
1009 AllPoolTransactions {
1010 pending: pool.pending_transactions(),
1011 queued: pool.queued_transactions(),
1012 }
1013 }
1014
1015 pub fn all_transaction_hashes(&self) -> Vec<TxHash> {
1017 self.get_pool_data().all().transactions_iter().map(|tx| *tx.hash()).collect()
1018 }
1019
1020 pub fn remove_transactions(
1025 &self,
1026 hashes: Vec<TxHash>,
1027 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1028 if hashes.is_empty() {
1029 return Vec::new()
1030 }
1031 let removed = self.pool.write().remove_transactions(hashes);
1032
1033 self.with_event_listener(|listener| listener.discarded_many(&removed));
1034
1035 removed
1036 }
1037
1038 pub fn remove_transactions_and_descendants(
1041 &self,
1042 hashes: Vec<TxHash>,
1043 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1044 if hashes.is_empty() {
1045 return Vec::new()
1046 }
1047 let removed = self.pool.write().remove_transactions_and_descendants(hashes);
1048
1049 self.with_event_listener(|listener| {
1050 for tx in &removed {
1051 listener.discarded(tx.hash());
1052 }
1053 });
1054
1055 removed
1056 }
1057
1058 pub fn remove_transactions_by_sender(
1060 &self,
1061 sender: Address,
1062 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1063 let sender_id = self.get_sender_id(sender);
1064 let removed = self.pool.write().remove_transactions_by_sender(sender_id);
1065
1066 self.with_event_listener(|listener| listener.discarded_many(&removed));
1067
1068 removed
1069 }
1070
1071 pub fn retain_unknown<A>(&self, announcement: &mut A)
1073 where
1074 A: HandleMempoolData,
1075 {
1076 if announcement.is_empty() {
1077 return
1078 }
1079 let pool = self.get_pool_data();
1080 announcement.retain_by_hash(|tx| !pool.contains(tx))
1081 }
1082
1083 pub fn get(&self, tx_hash: &TxHash) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1085 self.get_pool_data().get(tx_hash)
1086 }
1087
1088 pub fn get_transactions_by_sender(
1090 &self,
1091 sender: Address,
1092 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1093 let sender_id = self.get_sender_id(sender);
1094 self.get_pool_data().get_transactions_by_sender(sender_id)
1095 }
1096
1097 pub fn get_queued_transactions_by_sender(
1099 &self,
1100 sender: Address,
1101 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1102 let sender_id = self.get_sender_id(sender);
1103 self.get_pool_data().queued_txs_by_sender(sender_id)
1104 }
1105
1106 pub fn pending_transactions_with_predicate(
1108 &self,
1109 predicate: impl FnMut(&ValidPoolTransaction<T::Transaction>) -> bool,
1110 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1111 self.get_pool_data().pending_transactions_with_predicate(predicate)
1112 }
1113
1114 pub fn get_pending_transactions_by_sender(
1116 &self,
1117 sender: Address,
1118 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1119 let sender_id = self.get_sender_id(sender);
1120 self.get_pool_data().pending_txs_by_sender(sender_id)
1121 }
1122
1123 pub fn get_highest_transaction_by_sender(
1125 &self,
1126 sender: Address,
1127 ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1128 let sender_id = self.get_sender_id(sender);
1129 self.get_pool_data().get_highest_transaction_by_sender(sender_id)
1130 }
1131
1132 pub fn get_highest_consecutive_transaction_by_sender(
1134 &self,
1135 sender: Address,
1136 on_chain_nonce: u64,
1137 ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1138 let sender_id = self.get_sender_id(sender);
1139 self.get_pool_data().get_highest_consecutive_transaction_by_sender(
1140 sender_id.into_transaction_id(on_chain_nonce),
1141 )
1142 }
1143
1144 pub fn get_transaction_by_transaction_id(
1146 &self,
1147 transaction_id: &TransactionId,
1148 ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1149 self.get_pool_data().all().get(transaction_id).map(|tx| tx.transaction.clone())
1150 }
1151
1152 pub fn get_transactions_by_origin(
1154 &self,
1155 origin: TransactionOrigin,
1156 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1157 self.get_pool_data()
1158 .all()
1159 .transactions_iter()
1160 .filter(|tx| tx.origin == origin)
1161 .cloned()
1162 .collect()
1163 }
1164
1165 pub fn get_pending_transactions_by_origin(
1167 &self,
1168 origin: TransactionOrigin,
1169 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1170 self.get_pool_data().pending_transactions_iter().filter(|tx| tx.origin == origin).collect()
1171 }
1172
1173 pub fn get_all(&self, txs: Vec<TxHash>) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1177 if txs.is_empty() {
1178 return Vec::new()
1179 }
1180 self.get_pool_data().get_all(txs).collect()
1181 }
1182
1183 fn get_all_propagatable(
1187 &self,
1188 txs: &[TxHash],
1189 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1190 if txs.is_empty() {
1191 return Vec::new()
1192 }
1193 let pool = self.get_pool_data();
1194 txs.iter().filter_map(|tx| pool.get(tx).filter(|tx| tx.propagate)).collect()
1195 }
1196
1197 pub fn on_propagated(&self, txs: PropagatedTransactions) {
1199 if txs.0.is_empty() {
1200 return
1201 }
1202 self.with_event_listener(|listener| {
1203 txs.0.into_iter().for_each(|(hash, peers)| listener.propagated(&hash, peers));
1204 });
1205 }
1206
1207 pub fn len(&self) -> usize {
1209 self.get_pool_data().len()
1210 }
1211
1212 pub fn is_empty(&self) -> bool {
1214 self.get_pool_data().is_empty()
1215 }
1216
1217 pub fn is_exceeded(&self) -> bool {
1219 self.pool.read().is_exceeded()
1220 }
1221
1222 fn insert_blob(&self, hash: TxHash, blob: BlobTransactionSidecarVariant) {
1224 debug!(target: "txpool", "[{:?}] storing blob sidecar", hash);
1225 if let Err(err) = self.blob_store.insert(hash, blob) {
1226 warn!(target: "txpool", %err, "[{:?}] failed to insert blob", hash);
1227 self.blob_store_metrics.blobstore_failed_inserts.increment(1);
1228 }
1229 self.update_blob_store_metrics();
1230 }
1231
1232 pub fn delete_blob(&self, blob: TxHash) {
1234 let _ = self.blob_store.delete(blob);
1235 }
1236
1237 pub fn delete_blobs(&self, txs: Vec<TxHash>) {
1239 let _ = self.blob_store.delete_all(txs);
1240 }
1241
1242 pub fn cleanup_blobs(&self) {
1244 let stat = self.blob_store.cleanup();
1245 self.blob_store_metrics.blobstore_failed_deletes.increment(stat.delete_failed as u64);
1246 self.update_blob_store_metrics();
1247 }
1248
1249 fn update_blob_store_metrics(&self) {
1250 if let Some(data_size) = self.blob_store.data_size_hint() {
1251 self.blob_store_metrics.blobstore_byte_size.set(data_size as f64);
1252 }
1253 self.blob_store_metrics.blobstore_entries.set(self.blob_store.blobs_len() as f64);
1254 }
1255
1256 fn delete_discarded_blobs<'a>(
1258 &'a self,
1259 transactions: impl IntoIterator<Item = &'a Arc<ValidPoolTransaction<T::Transaction>>>,
1260 ) {
1261 let blob_txs = transactions
1262 .into_iter()
1263 .filter(|tx| tx.transaction.is_eip4844())
1264 .map(|tx| *tx.hash())
1265 .collect();
1266 self.delete_blobs(blob_txs);
1267 }
1268}
1269
1270impl<V, T: TransactionOrdering, S> fmt::Debug for PoolInner<V, T, S> {
1271 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1272 f.debug_struct("PoolInner").field("config", &self.config).finish_non_exhaustive()
1273 }
1274}
1275
1276#[derive(Debug)]
1281struct AddedTransactionMeta<T: PoolTransaction> {
1282 added: AddedTransaction<T>,
1284 blob_sidecar: Option<BlobTransactionSidecarVariant>,
1286}
1287
1288#[derive(Debug, Clone)]
1290pub struct AddedPendingTransaction<T: PoolTransaction> {
1291 pub transaction: Arc<ValidPoolTransaction<T>>,
1293 pub replaced: Option<Arc<ValidPoolTransaction<T>>>,
1295 pub promoted: Vec<Arc<ValidPoolTransaction<T>>>,
1297 pub discarded: Vec<Arc<ValidPoolTransaction<T>>>,
1299}
1300
1301impl<T: PoolTransaction> AddedPendingTransaction<T> {
1302 pub(crate) fn pending_transactions(
1308 &self,
1309 kind: TransactionListenerKind,
1310 ) -> impl Iterator<Item = B256> + '_ {
1311 let iter = std::iter::once(&self.transaction).chain(self.promoted.iter());
1312 PendingTransactionIter { kind, iter }
1313 }
1314}
1315
1316pub(crate) struct PendingTransactionIter<Iter> {
1317 kind: TransactionListenerKind,
1318 iter: Iter,
1319}
1320
1321impl<'a, Iter, T> Iterator for PendingTransactionIter<Iter>
1322where
1323 Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1324 T: PoolTransaction + 'a,
1325{
1326 type Item = B256;
1327
1328 fn next(&mut self) -> Option<Self::Item> {
1329 loop {
1330 let next = self.iter.next()?;
1331 if self.kind.is_propagate_only() && !next.propagate {
1332 continue
1333 }
1334 return Some(*next.hash())
1335 }
1336 }
1337}
1338
1339pub(crate) struct FullPendingTransactionIter<Iter> {
1341 kind: TransactionListenerKind,
1342 iter: Iter,
1343}
1344
1345impl<'a, Iter, T> Iterator for FullPendingTransactionIter<Iter>
1346where
1347 Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1348 T: PoolTransaction + 'a,
1349{
1350 type Item = NewTransactionEvent<T>;
1351
1352 fn next(&mut self) -> Option<Self::Item> {
1353 loop {
1354 let next = self.iter.next()?;
1355 if self.kind.is_propagate_only() && !next.propagate {
1356 continue
1357 }
1358 return Some(NewTransactionEvent {
1359 subpool: SubPool::Pending,
1360 transaction: next.clone(),
1361 })
1362 }
1363 }
1364}
1365
1366#[derive(Debug, Clone)]
1368pub enum AddedTransaction<T: PoolTransaction> {
1369 Pending(AddedPendingTransaction<T>),
1371 Parked {
1374 transaction: Arc<ValidPoolTransaction<T>>,
1376 replaced: Option<Arc<ValidPoolTransaction<T>>>,
1378 subpool: SubPool,
1380 queued_reason: Option<QueuedReason>,
1382 },
1383}
1384
1385impl<T: PoolTransaction> AddedTransaction<T> {
1386 pub const fn as_pending(&self) -> Option<&AddedPendingTransaction<T>> {
1388 match self {
1389 Self::Pending(tx) => Some(tx),
1390 _ => None,
1391 }
1392 }
1393
1394 pub const fn replaced(&self) -> Option<&Arc<ValidPoolTransaction<T>>> {
1396 match self {
1397 Self::Pending(tx) => tx.replaced.as_ref(),
1398 Self::Parked { replaced, .. } => replaced.as_ref(),
1399 }
1400 }
1401
1402 pub(crate) fn discarded_transactions(&self) -> Option<&[Arc<ValidPoolTransaction<T>>]> {
1404 match self {
1405 Self::Pending(tx) => Some(&tx.discarded),
1406 Self::Parked { .. } => None,
1407 }
1408 }
1409
1410 pub(crate) fn replaced_blob_transaction(&self) -> Option<B256> {
1412 self.replaced().filter(|tx| tx.transaction.is_eip4844()).map(|tx| *tx.transaction.hash())
1413 }
1414
1415 pub fn hash(&self) -> &TxHash {
1417 match self {
1418 Self::Pending(tx) => tx.transaction.hash(),
1419 Self::Parked { transaction, .. } => transaction.hash(),
1420 }
1421 }
1422
1423 pub fn into_new_transaction_event(self) -> NewTransactionEvent<T> {
1425 match self {
1426 Self::Pending(tx) => {
1427 NewTransactionEvent { subpool: SubPool::Pending, transaction: tx.transaction }
1428 }
1429 Self::Parked { transaction, subpool, .. } => {
1430 NewTransactionEvent { transaction, subpool }
1431 }
1432 }
1433 }
1434
1435 pub(crate) const fn subpool(&self) -> SubPool {
1437 match self {
1438 Self::Pending(_) => SubPool::Pending,
1439 Self::Parked { subpool, .. } => *subpool,
1440 }
1441 }
1442
1443 #[cfg(test)]
1445 pub(crate) fn id(&self) -> &TransactionId {
1446 match self {
1447 Self::Pending(added) => added.transaction.id(),
1448 Self::Parked { transaction, .. } => transaction.id(),
1449 }
1450 }
1451
1452 pub const fn queued_reason(&self) -> Option<&QueuedReason> {
1454 match self {
1455 Self::Pending(_) => None,
1456 Self::Parked { queued_reason, .. } => queued_reason.as_ref(),
1457 }
1458 }
1459
1460 pub fn transaction_state(&self) -> AddedTransactionState {
1462 match self.subpool() {
1463 SubPool::Pending => AddedTransactionState::Pending,
1464 _ => {
1465 if let Some(reason) = self.queued_reason() {
1468 AddedTransactionState::Queued(reason.clone())
1469 } else {
1470 AddedTransactionState::Queued(QueuedReason::NonceGap)
1472 }
1473 }
1474 }
1475 }
1476}
1477
1478#[derive(Debug, Clone, PartialEq, Eq)]
1480pub enum QueuedReason {
1481 NonceGap,
1483 ParkedAncestors,
1485 InsufficientBalance,
1487 TooMuchGas,
1489 InsufficientBaseFee,
1491 InsufficientBlobFee,
1493}
1494
1495#[derive(Debug, Clone, PartialEq, Eq)]
1497pub enum AddedTransactionState {
1498 Pending,
1500 Queued(QueuedReason),
1502}
1503
1504impl AddedTransactionState {
1505 pub const fn is_queued(&self) -> bool {
1507 matches!(self, Self::Queued(_))
1508 }
1509
1510 pub const fn is_pending(&self) -> bool {
1512 matches!(self, Self::Pending)
1513 }
1514
1515 pub const fn queued_reason(&self) -> Option<&QueuedReason> {
1517 match self {
1518 Self::Queued(reason) => Some(reason),
1519 Self::Pending => None,
1520 }
1521 }
1522}
1523
1524#[derive(Debug, Clone, PartialEq, Eq)]
1526pub struct AddedTransactionOutcome {
1527 pub hash: TxHash,
1529 pub state: AddedTransactionState,
1531}
1532
1533impl AddedTransactionOutcome {
1534 pub const fn is_queued(&self) -> bool {
1536 self.state.is_queued()
1537 }
1538
1539 pub const fn is_pending(&self) -> bool {
1541 self.state.is_pending()
1542 }
1543}
1544
1545#[derive(Debug)]
1547pub(crate) struct OnNewCanonicalStateOutcome<T: PoolTransaction> {
1548 pub(crate) block_hash: B256,
1550 pub(crate) mined: Vec<TxHash>,
1552 pub(crate) promoted: Vec<Arc<ValidPoolTransaction<T>>>,
1554 pub(crate) discarded: Vec<Arc<ValidPoolTransaction<T>>>,
1556}
1557
1558impl<T: PoolTransaction> OnNewCanonicalStateOutcome<T> {
1559 pub(crate) fn pending_transactions(
1565 &self,
1566 kind: TransactionListenerKind,
1567 ) -> impl Iterator<Item = B256> + '_ {
1568 let iter = self.promoted.iter();
1569 PendingTransactionIter { kind, iter }
1570 }
1571
1572 pub(crate) fn full_pending_transactions(
1578 &self,
1579 kind: TransactionListenerKind,
1580 ) -> impl Iterator<Item = NewTransactionEvent<T>> + '_ {
1581 let iter = self.promoted.iter();
1582 FullPendingTransactionIter { kind, iter }
1583 }
1584}
1585
1586#[cfg(test)]
1587mod tests {
1588 use crate::{
1589 blobstore::{BlobStore, InMemoryBlobStore},
1590 identifier::SenderId,
1591 test_utils::{MockTransaction, TestPoolBuilder},
1592 validate::ValidTransaction,
1593 BlockInfo, PoolConfig, SubPoolLimit, TransactionOrigin, TransactionValidationOutcome, U256,
1594 };
1595 use alloy_eips::{eip4844::BlobTransactionSidecar, eip7594::BlobTransactionSidecarVariant};
1596 use alloy_primitives::Address;
1597 use std::{fs, path::PathBuf};
1598
1599 #[test]
1600 fn test_discard_blobs_on_blob_tx_eviction() {
1601 let blobs = {
1602 let json_content = fs::read_to_string(
1604 PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test_data/blob1.json"),
1605 )
1606 .expect("Failed to read the blob data file");
1607
1608 let json_value: serde_json::Value =
1610 serde_json::from_str(&json_content).expect("Failed to deserialize JSON");
1611
1612 vec![
1614 json_value
1616 .get("data")
1617 .unwrap()
1618 .as_str()
1619 .expect("Data is not a valid string")
1620 .to_string(),
1621 ]
1622 };
1623
1624 let sidecar = BlobTransactionSidecarVariant::Eip4844(
1626 BlobTransactionSidecar::try_from_blobs_hex(blobs).unwrap(),
1627 );
1628
1629 let blob_limit = SubPoolLimit::new(1000, usize::MAX);
1631
1632 let test_pool = &TestPoolBuilder::default()
1634 .with_config(PoolConfig { blob_limit, ..Default::default() })
1635 .pool;
1636
1637 test_pool
1639 .set_block_info(BlockInfo { pending_blob_fee: Some(10_000_000), ..Default::default() });
1640
1641 let blob_store = InMemoryBlobStore::default();
1643
1644 for n in 0..blob_limit.max_txs + 10 {
1646 let mut tx = MockTransaction::eip4844_with_sidecar(sidecar.clone());
1648
1649 tx.set_size(1844674407370951);
1651
1652 if n < blob_limit.max_txs {
1654 blob_store.insert(*tx.get_hash(), sidecar.clone()).unwrap();
1655 }
1656
1657 test_pool.add_transactions(
1659 TransactionOrigin::External,
1660 [TransactionValidationOutcome::Valid {
1661 balance: U256::from(1_000),
1662 state_nonce: 0,
1663 bytecode_hash: None,
1664 transaction: ValidTransaction::ValidWithSidecar {
1665 transaction: tx,
1666 sidecar: sidecar.clone(),
1667 },
1668 propagate: true,
1669 authorities: None,
1670 }],
1671 );
1672 }
1673
1674 assert_eq!(test_pool.size().blob, blob_limit.max_txs);
1676
1677 assert_eq!(test_pool.size().blob_size, 1844674407370951000);
1679
1680 assert_eq!(*test_pool.blob_store(), blob_store);
1682 }
1683
1684 #[test]
1685 fn test_auths_stored_in_identifiers() {
1686 let test_pool = &TestPoolBuilder::default().with_config(Default::default()).pool;
1688
1689 let auth = Address::new([1; 20]);
1690 let tx = MockTransaction::eip7702();
1691
1692 test_pool.add_transactions(
1693 TransactionOrigin::Local,
1694 [TransactionValidationOutcome::Valid {
1695 balance: U256::from(1_000),
1696 state_nonce: 0,
1697 bytecode_hash: None,
1698 transaction: ValidTransaction::Valid(tx),
1699 propagate: true,
1700 authorities: Some(vec![auth]),
1701 }],
1702 );
1703
1704 let identifiers = test_pool.identifiers.read();
1705 assert_eq!(identifiers.sender_id(&auth), Some(SenderId::from(1)));
1706 }
1707}