1use crate::{
69 blobstore::BlobStore,
70 error::{PoolError, PoolErrorKind, PoolResult},
71 identifier::{SenderId, SenderIdentifiers, TransactionId},
72 metrics::BlobStoreMetrics,
73 pool::{
74 listener::{
75 BlobTransactionSidecarListener, PendingTransactionHashListener, PoolEventBroadcast,
76 TransactionListener,
77 },
78 state::SubPool,
79 txpool::{SenderInfo, TxPool},
80 update::UpdateOutcome,
81 },
82 traits::{
83 AllPoolTransactions, BestTransactionsAttributes, BlockInfo, GetPooledTransactionLimit,
84 NewBlobSidecar, PoolSize, PoolTransaction, PropagatedTransactions, TransactionOrigin,
85 },
86 validate::{TransactionValidationOutcome, ValidPoolTransaction, ValidTransaction},
87 CanonicalStateUpdate, EthPoolTransaction, PoolConfig, TransactionOrdering,
88 TransactionValidator,
89};
90
91use alloy_primitives::{Address, TxHash, B256};
92use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
93use reth_eth_wire_types::HandleMempoolData;
94use reth_execution_types::ChangedAccount;
95
96use alloy_eips::{eip7594::BlobTransactionSidecarVariant, Typed2718};
97use reth_primitives_traits::Recovered;
98use rustc_hash::FxHashMap;
99use std::{
100 collections::HashSet,
101 fmt,
102 sync::{
103 atomic::{AtomicBool, Ordering},
104 Arc,
105 },
106 time::Instant,
107};
108use tokio::sync::mpsc;
109use tracing::{debug, trace, warn};
110mod events;
111pub use best::{BestTransactionFilter, BestTransactionsWithPrioritizedSenders};
112pub use blob::{blob_tx_priority, fee_delta, BlobOrd, BlobTransactions};
113pub use events::{FullTransactionEvent, NewTransactionEvent, TransactionEvent};
114pub use listener::{AllTransactionsEvents, TransactionEvents, TransactionListenerKind};
115pub use parked::{BasefeeOrd, ParkedOrd, ParkedPool, QueuedOrd};
116pub use pending::PendingPool;
117
118mod best;
119pub use best::BestTransactions;
120
121mod blob;
122pub mod listener;
123mod parked;
124pub mod pending;
125pub mod size;
126pub(crate) mod state;
127pub mod txpool;
128mod update;
129
130pub const PENDING_TX_LISTENER_BUFFER_SIZE: usize = 2048;
132pub const NEW_TX_LISTENER_BUFFER_SIZE: usize = 1024;
134
135const BLOB_SIDECAR_LISTENER_BUFFER_SIZE: usize = 512;
136
137pub struct PoolInner<V, T, S>
139where
140 T: TransactionOrdering,
141{
142 identifiers: RwLock<SenderIdentifiers>,
144 validator: V,
146 blob_store: S,
148 pool: RwLock<TxPool<T>>,
150 config: PoolConfig,
152 event_listener: RwLock<PoolEventBroadcast<T::Transaction>>,
154 has_event_listeners: AtomicBool,
156 pending_transaction_listener: RwLock<Vec<PendingTransactionHashListener>>,
158 transaction_listener: RwLock<Vec<TransactionListener<T::Transaction>>>,
160 blob_transaction_sidecar_listener: Mutex<Vec<BlobTransactionSidecarListener>>,
162 blob_store_metrics: BlobStoreMetrics,
164}
165
166impl<V, T, S> PoolInner<V, T, S>
169where
170 V: TransactionValidator,
171 T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
172 S: BlobStore,
173{
174 pub fn new(validator: V, ordering: T, blob_store: S, config: PoolConfig) -> Self {
176 Self {
177 identifiers: Default::default(),
178 validator,
179 event_listener: Default::default(),
180 has_event_listeners: AtomicBool::new(false),
181 pool: RwLock::new(TxPool::new(ordering, config.clone())),
182 pending_transaction_listener: Default::default(),
183 transaction_listener: Default::default(),
184 blob_transaction_sidecar_listener: Default::default(),
185 config,
186 blob_store,
187 blob_store_metrics: Default::default(),
188 }
189 }
190
191 pub const fn blob_store(&self) -> &S {
193 &self.blob_store
194 }
195
196 pub fn size(&self) -> PoolSize {
198 self.get_pool_data().size()
199 }
200
201 pub fn block_info(&self) -> BlockInfo {
203 self.get_pool_data().block_info()
204 }
205 pub fn set_block_info(&self, info: BlockInfo) {
207 self.pool.write().set_block_info(info)
208 }
209
210 pub fn get_sender_id(&self, addr: Address) -> SenderId {
212 self.identifiers.write().sender_id_or_create(addr)
213 }
214
215 pub fn get_sender_ids(&self, addrs: impl IntoIterator<Item = Address>) -> Vec<SenderId> {
217 self.identifiers.write().sender_ids_or_create(addrs)
218 }
219
220 pub fn unique_senders(&self) -> HashSet<Address> {
222 self.get_pool_data().unique_senders()
223 }
224
225 fn changed_senders(
228 &self,
229 accs: impl Iterator<Item = ChangedAccount>,
230 ) -> FxHashMap<SenderId, SenderInfo> {
231 let identifiers = self.identifiers.read();
232 accs.into_iter()
233 .filter_map(|acc| {
234 let ChangedAccount { address, nonce, balance } = acc;
235 let sender_id = identifiers.sender_id(&address)?;
236 Some((sender_id, SenderInfo { state_nonce: nonce, balance }))
237 })
238 .collect()
239 }
240
241 pub const fn config(&self) -> &PoolConfig {
243 &self.config
244 }
245
246 pub const fn validator(&self) -> &V {
248 &self.validator
249 }
250
251 pub fn add_pending_listener(&self, kind: TransactionListenerKind) -> mpsc::Receiver<TxHash> {
254 let (sender, rx) = mpsc::channel(self.config.pending_tx_listener_buffer_size);
255 let listener = PendingTransactionHashListener { sender, kind };
256
257 let mut listeners = self.pending_transaction_listener.write();
258 listeners.retain(|l| !l.sender.is_closed());
260 listeners.push(listener);
261
262 rx
263 }
264
265 pub fn add_new_transaction_listener(
267 &self,
268 kind: TransactionListenerKind,
269 ) -> mpsc::Receiver<NewTransactionEvent<T::Transaction>> {
270 let (sender, rx) = mpsc::channel(self.config.new_tx_listener_buffer_size);
271 let listener = TransactionListener { sender, kind };
272
273 let mut listeners = self.transaction_listener.write();
274 listeners.retain(|l| !l.sender.is_closed());
276 listeners.push(listener);
277
278 rx
279 }
280 pub fn add_blob_sidecar_listener(&self) -> mpsc::Receiver<NewBlobSidecar> {
283 let (sender, rx) = mpsc::channel(BLOB_SIDECAR_LISTENER_BUFFER_SIZE);
284 let listener = BlobTransactionSidecarListener { sender };
285 self.blob_transaction_sidecar_listener.lock().push(listener);
286 rx
287 }
288
289 pub fn add_transaction_event_listener(&self, tx_hash: TxHash) -> Option<TransactionEvents> {
292 if !self.get_pool_data().contains(&tx_hash) {
293 return None
294 }
295 let mut listener = self.event_listener.write();
296 let events = listener.subscribe(tx_hash);
297 self.mark_event_listener_installed();
298 Some(events)
299 }
300
301 pub fn add_all_transactions_event_listener(&self) -> AllTransactionsEvents<T::Transaction> {
303 let mut listener = self.event_listener.write();
304 let events = listener.subscribe_all();
305 self.mark_event_listener_installed();
306 events
307 }
308
309 #[inline]
310 fn has_event_listeners(&self) -> bool {
311 self.has_event_listeners.load(Ordering::Relaxed)
312 }
313
314 #[inline]
315 fn mark_event_listener_installed(&self) {
316 self.has_event_listeners.store(true, Ordering::Relaxed);
317 }
318
319 #[inline]
320 fn update_event_listener_state(&self, listener: &PoolEventBroadcast<T::Transaction>) {
321 if listener.is_empty() {
322 self.has_event_listeners.store(false, Ordering::Relaxed);
323 }
324 }
325
326 #[inline]
327 fn with_event_listener<F>(&self, emit: F)
328 where
329 F: FnOnce(&mut PoolEventBroadcast<T::Transaction>),
330 {
331 if !self.has_event_listeners() {
332 return
333 }
334 let mut listener = self.event_listener.write();
335 if !listener.is_empty() {
336 emit(&mut listener);
337 }
338 self.update_event_listener_state(&listener);
339 }
340
341 pub fn get_pool_data(&self) -> RwLockReadGuard<'_, TxPool<T>> {
343 self.pool.read()
344 }
345
346 pub fn pooled_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
348 let mut out = Vec::new();
349 self.append_pooled_transactions(&mut out);
350 out
351 }
352
353 pub fn pooled_transactions_hashes(&self) -> Vec<TxHash> {
355 let mut out = Vec::new();
356 self.append_pooled_transactions_hashes(&mut out);
357 out
358 }
359
360 pub fn pooled_transactions_max(
362 &self,
363 max: usize,
364 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
365 let mut out = Vec::new();
366 self.append_pooled_transactions_max(max, &mut out);
367 out
368 }
369
370 pub fn append_pooled_transactions(
372 &self,
373 out: &mut Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
374 ) {
375 out.extend(
376 self.get_pool_data().all().transactions_iter().filter(|tx| tx.propagate).cloned(),
377 );
378 }
379
380 pub fn append_pooled_transaction_elements(
383 &self,
384 tx_hashes: &[TxHash],
385 limit: GetPooledTransactionLimit,
386 out: &mut Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>,
387 ) where
388 <V as TransactionValidator>::Transaction: EthPoolTransaction,
389 {
390 let transactions = self.get_all_propagatable(tx_hashes);
391 let mut size = 0;
392 for transaction in transactions {
393 let encoded_len = transaction.encoded_length();
394 let Some(pooled) = self.to_pooled_transaction(transaction) else {
395 continue;
396 };
397
398 size += encoded_len;
399 out.push(pooled.into_inner());
400
401 if limit.exceeds(size) {
402 break
403 }
404 }
405 }
406
407 pub fn append_pooled_transactions_hashes(&self, out: &mut Vec<TxHash>) {
410 out.extend(
411 self.get_pool_data()
412 .all()
413 .transactions_iter()
414 .filter(|tx| tx.propagate)
415 .map(|tx| *tx.hash()),
416 );
417 }
418
419 pub fn append_pooled_transactions_max(
422 &self,
423 max: usize,
424 out: &mut Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
425 ) {
426 out.extend(
427 self.get_pool_data()
428 .all()
429 .transactions_iter()
430 .filter(|tx| tx.propagate)
431 .take(max)
432 .cloned(),
433 );
434 }
435
436 pub fn pooled_transactions_hashes_max(&self, max: usize) -> Vec<TxHash> {
438 if max == 0 {
439 return Vec::new();
440 }
441 self.get_pool_data()
442 .all()
443 .transactions_iter()
444 .filter(|tx| tx.propagate)
445 .take(max)
446 .map(|tx| *tx.hash())
447 .collect()
448 }
449
450 fn to_pooled_transaction(
455 &self,
456 transaction: Arc<ValidPoolTransaction<T::Transaction>>,
457 ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
458 where
459 <V as TransactionValidator>::Transaction: EthPoolTransaction,
460 {
461 if transaction.is_eip4844() {
462 let sidecar = self.blob_store.get(*transaction.hash()).ok()??;
463 transaction.transaction.clone().try_into_pooled_eip4844(sidecar)
464 } else {
465 transaction
466 .transaction
467 .clone_into_pooled()
468 .inspect_err(|err| {
469 debug!(
470 target: "txpool", %err,
471 "failed to convert transaction to pooled element; skipping",
472 );
473 })
474 .ok()
475 }
476 }
477
478 pub fn get_pooled_transaction_elements(
481 &self,
482 tx_hashes: Vec<TxHash>,
483 limit: GetPooledTransactionLimit,
484 ) -> Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>
485 where
486 <V as TransactionValidator>::Transaction: EthPoolTransaction,
487 {
488 let mut elements = Vec::new();
489 self.append_pooled_transaction_elements(&tx_hashes, limit, &mut elements);
490 elements.shrink_to_fit();
491 elements
492 }
493
494 pub fn get_pooled_transaction_element(
496 &self,
497 tx_hash: TxHash,
498 ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
499 where
500 <V as TransactionValidator>::Transaction: EthPoolTransaction,
501 {
502 self.get(&tx_hash).and_then(|tx| self.to_pooled_transaction(tx))
503 }
504
505 pub fn on_canonical_state_change(&self, update: CanonicalStateUpdate<'_, V::Block>) {
507 trace!(target: "txpool", ?update, "updating pool on canonical state change");
508
509 let block_info = update.block_info();
510 let CanonicalStateUpdate {
511 new_tip, changed_accounts, mined_transactions, update_kind, ..
512 } = update;
513 self.validator.on_new_head_block(new_tip);
514
515 let changed_senders = self.changed_senders(changed_accounts.into_iter());
516
517 let outcome = self.pool.write().on_canonical_state_change(
519 block_info,
520 mined_transactions,
521 changed_senders,
522 update_kind,
523 );
524
525 self.delete_discarded_blobs(outcome.discarded.iter());
527
528 self.notify_on_new_state(outcome);
530 }
531
532 pub fn update_accounts(&self, accounts: Vec<ChangedAccount>) {
538 let changed_senders = self.changed_senders(accounts.into_iter());
539 let UpdateOutcome { promoted, discarded } =
540 self.pool.write().update_accounts(changed_senders);
541
542 self.notify_on_transaction_updates(promoted, discarded);
543 }
544
545 fn add_transaction(
553 &self,
554 pool: &mut RwLockWriteGuard<'_, TxPool<T>>,
555 origin: TransactionOrigin,
556 tx: TransactionValidationOutcome<T::Transaction>,
557 ) -> (PoolResult<AddedTransactionOutcome>, Option<AddedTransactionMeta<T::Transaction>>) {
558 match tx {
559 TransactionValidationOutcome::Valid {
560 balance,
561 state_nonce,
562 transaction,
563 propagate,
564 bytecode_hash,
565 authorities,
566 } => {
567 let sender_id = self.get_sender_id(transaction.sender());
568 let transaction_id = TransactionId::new(sender_id, transaction.nonce());
569
570 let (transaction, blob_sidecar) = match transaction {
572 ValidTransaction::Valid(tx) => (tx, None),
573 ValidTransaction::ValidWithSidecar { transaction, sidecar } => {
574 debug_assert!(
575 transaction.is_eip4844(),
576 "validator returned sidecar for non EIP-4844 transaction"
577 );
578 (transaction, Some(sidecar))
579 }
580 };
581
582 let tx = ValidPoolTransaction {
583 transaction,
584 transaction_id,
585 propagate,
586 timestamp: Instant::now(),
587 origin,
588 authority_ids: authorities.map(|auths| self.get_sender_ids(auths)),
589 };
590
591 let added = match pool.add_transaction(tx, balance, state_nonce, bytecode_hash) {
592 Ok(added) => added,
593 Err(err) => return (Err(err), None),
594 };
595 let hash = *added.hash();
596 let state = added.transaction_state();
597
598 let meta = AddedTransactionMeta { added, blob_sidecar };
599
600 (Ok(AddedTransactionOutcome { hash, state }), Some(meta))
601 }
602 TransactionValidationOutcome::Invalid(tx, err) => {
603 self.with_event_listener(|listener| listener.invalid(tx.hash()));
604 (Err(PoolError::new(*tx.hash(), err)), None)
605 }
606 TransactionValidationOutcome::Error(tx_hash, err) => {
607 self.with_event_listener(|listener| listener.discarded(&tx_hash));
608 (Err(PoolError::other(tx_hash, err)), None)
609 }
610 }
611 }
612
613 pub fn add_transaction_and_subscribe(
615 &self,
616 origin: TransactionOrigin,
617 tx: TransactionValidationOutcome<T::Transaction>,
618 ) -> PoolResult<TransactionEvents> {
619 let listener = {
620 let mut listener = self.event_listener.write();
621 let events = listener.subscribe(tx.tx_hash());
622 self.mark_event_listener_installed();
623 events
624 };
625 let mut results = self.add_transactions(origin, std::iter::once(tx));
626 results.pop().expect("result length is the same as the input")?;
627 Ok(listener)
628 }
629
630 pub fn add_transactions(
632 &self,
633 origin: TransactionOrigin,
634 transactions: impl IntoIterator<Item = TransactionValidationOutcome<T::Transaction>>,
635 ) -> Vec<PoolResult<AddedTransactionOutcome>> {
636 let (mut results, added_metas, discarded) = {
638 let mut pool = self.pool.write();
639 let mut added_metas = Vec::new();
640
641 let results = transactions
642 .into_iter()
643 .map(|tx| {
644 let (result, meta) = self.add_transaction(&mut pool, origin, tx);
645
646 if result.is_ok() &&
648 let Some(meta) = meta
649 {
650 added_metas.push(meta);
651 }
652
653 result
654 })
655 .collect::<Vec<_>>();
656
657 let discarded = if results.iter().any(Result::is_ok) {
659 pool.discard_worst()
660 } else {
661 Default::default()
662 };
663
664 (results, added_metas, discarded)
665 };
666
667 for meta in added_metas {
668 self.on_added_transaction(meta);
669 }
670
671 if !discarded.is_empty() {
672 self.delete_discarded_blobs(discarded.iter());
674 self.with_event_listener(|listener| listener.discarded_many(&discarded));
675
676 let discarded_hashes =
677 discarded.into_iter().map(|tx| *tx.hash()).collect::<HashSet<_>>();
678
679 for res in &mut results {
682 if let Ok(AddedTransactionOutcome { hash, .. }) = res &&
683 discarded_hashes.contains(hash)
684 {
685 *res = Err(PoolError::new(*hash, PoolErrorKind::DiscardedOnInsert))
686 }
687 }
688 };
689
690 results
691 }
692
693 fn on_added_transaction(&self, meta: AddedTransactionMeta<T::Transaction>) {
698 if let Some(sidecar) = meta.blob_sidecar {
700 let hash = *meta.added.hash();
701 self.on_new_blob_sidecar(&hash, &sidecar);
702 self.insert_blob(hash, sidecar);
703 }
704
705 if let Some(replaced) = meta.added.replaced_blob_transaction() {
707 debug!(target: "txpool", "[{:?}] delete replaced blob sidecar", replaced);
708 self.delete_blob(replaced);
709 }
710
711 if let Some(discarded) = meta.added.discarded_transactions() {
713 self.delete_discarded_blobs(discarded.iter());
714 }
715
716 if let Some(pending) = meta.added.as_pending() {
718 self.on_new_pending_transaction(pending);
719 }
720
721 self.notify_event_listeners(&meta.added);
723
724 self.on_new_transaction(meta.added.into_new_transaction_event());
726 }
727
728 pub fn on_new_pending_transaction(&self, pending: &AddedPendingTransaction<T::Transaction>) {
737 let mut needs_cleanup = false;
738
739 {
740 let listeners = self.pending_transaction_listener.read();
741 for listener in listeners.iter() {
742 if !listener.send_all(pending.pending_transactions(listener.kind)) {
743 needs_cleanup = true;
744 }
745 }
746 }
747
748 if needs_cleanup {
750 self.pending_transaction_listener
751 .write()
752 .retain(|listener| !listener.sender.is_closed());
753 }
754 }
755
756 pub fn on_new_transaction(&self, event: NewTransactionEvent<T::Transaction>) {
765 let mut needs_cleanup = false;
766
767 {
768 let listeners = self.transaction_listener.read();
769 for listener in listeners.iter() {
770 if listener.kind.is_propagate_only() && !event.transaction.propagate {
771 if listener.sender.is_closed() {
772 needs_cleanup = true;
773 }
774 continue
776 }
777
778 if !listener.send(event.clone()) {
779 needs_cleanup = true;
780 }
781 }
782 }
783
784 if needs_cleanup {
786 self.transaction_listener.write().retain(|listener| !listener.sender.is_closed());
787 }
788 }
789
790 fn on_new_blob_sidecar(&self, tx_hash: &TxHash, sidecar: &BlobTransactionSidecarVariant) {
792 let mut sidecar_listeners = self.blob_transaction_sidecar_listener.lock();
793 if sidecar_listeners.is_empty() {
794 return
795 }
796 let sidecar = Arc::new(sidecar.clone());
797 sidecar_listeners.retain_mut(|listener| {
798 let new_blob_event = NewBlobSidecar { tx_hash: *tx_hash, sidecar: sidecar.clone() };
799 match listener.sender.try_send(new_blob_event) {
800 Ok(()) => true,
801 Err(err) => {
802 if matches!(err, mpsc::error::TrySendError::Full(_)) {
803 debug!(
804 target: "txpool",
805 "[{:?}] failed to send blob sidecar; channel full",
806 sidecar,
807 );
808 true
809 } else {
810 false
811 }
812 }
813 }
814 })
815 }
816
817 fn notify_on_new_state(&self, outcome: OnNewCanonicalStateOutcome<T::Transaction>) {
819 trace!(target: "txpool", promoted=outcome.promoted.len(), discarded= outcome.discarded.len() ,"notifying listeners on state change");
820
821 let mut needs_pending_cleanup = false;
823 {
824 let listeners = self.pending_transaction_listener.read();
825 for listener in listeners.iter() {
826 if !listener.send_all(outcome.pending_transactions(listener.kind)) {
827 needs_pending_cleanup = true;
828 }
829 }
830 }
831 if needs_pending_cleanup {
832 self.pending_transaction_listener.write().retain(|l| !l.sender.is_closed());
833 }
834
835 let mut needs_tx_cleanup = false;
837 {
838 let listeners = self.transaction_listener.read();
839 for listener in listeners.iter() {
840 if !listener.send_all(outcome.full_pending_transactions(listener.kind)) {
841 needs_tx_cleanup = true;
842 }
843 }
844 }
845 if needs_tx_cleanup {
846 self.transaction_listener.write().retain(|l| !l.sender.is_closed());
847 }
848
849 let OnNewCanonicalStateOutcome { mined, promoted, discarded, block_hash } = outcome;
850
851 self.with_event_listener(|listener| {
853 for tx in &mined {
854 listener.mined(tx, block_hash);
855 }
856 for tx in &promoted {
857 listener.pending(tx.hash(), None);
858 }
859 for tx in &discarded {
860 listener.discarded(tx.hash());
861 }
862 })
863 }
864
865 #[allow(clippy::type_complexity)]
874 pub fn notify_on_transaction_updates(
875 &self,
876 promoted: Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
877 discarded: Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
878 ) {
879 if !promoted.is_empty() {
881 let mut needs_pending_cleanup = false;
882 {
883 let listeners = self.pending_transaction_listener.read();
884 for listener in listeners.iter() {
885 let promoted_hashes = promoted.iter().filter_map(|tx| {
886 if listener.kind.is_propagate_only() && !tx.propagate {
887 None
888 } else {
889 Some(*tx.hash())
890 }
891 });
892 if !listener.send_all(promoted_hashes) {
893 needs_pending_cleanup = true;
894 }
895 }
896 }
897 if needs_pending_cleanup {
898 self.pending_transaction_listener.write().retain(|l| !l.sender.is_closed());
899 }
900
901 let mut needs_tx_cleanup = false;
903 {
904 let listeners = self.transaction_listener.read();
905 for listener in listeners.iter() {
906 let promoted_txs = promoted.iter().filter_map(|tx| {
907 if listener.kind.is_propagate_only() && !tx.propagate {
908 None
909 } else {
910 Some(NewTransactionEvent::pending(tx.clone()))
911 }
912 });
913 if !listener.send_all(promoted_txs) {
914 needs_tx_cleanup = true;
915 }
916 }
917 }
918 if needs_tx_cleanup {
919 self.transaction_listener.write().retain(|l| !l.sender.is_closed());
920 }
921 }
922
923 self.with_event_listener(|listener| {
924 for tx in &promoted {
925 listener.pending(tx.hash(), None);
926 }
927 for tx in &discarded {
928 listener.discarded(tx.hash());
929 }
930 });
931
932 if !discarded.is_empty() {
933 self.delete_discarded_blobs(discarded.iter());
936 }
937 }
938
939 pub fn notify_event_listeners(&self, tx: &AddedTransaction<T::Transaction>) {
948 self.with_event_listener(|listener| match tx {
949 AddedTransaction::Pending(tx) => {
950 let AddedPendingTransaction { transaction, promoted, discarded, replaced } = tx;
951
952 listener.pending(transaction.hash(), replaced.clone());
953 for tx in promoted {
954 listener.pending(tx.hash(), None);
955 }
956 for tx in discarded {
957 listener.discarded(tx.hash());
958 }
959 }
960 AddedTransaction::Parked { transaction, replaced, queued_reason, .. } => {
961 listener.queued(transaction.hash(), queued_reason.clone());
962 if let Some(replaced) = replaced {
963 listener.replaced(replaced.clone(), *transaction.hash());
964 }
965 }
966 });
967 }
968
969 pub fn best_transactions(&self) -> BestTransactions<T> {
971 self.get_pool_data().best_transactions()
972 }
973
974 pub fn best_transactions_with_attributes(
977 &self,
978 best_transactions_attributes: BestTransactionsAttributes,
979 ) -> Box<dyn crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T::Transaction>>>>
980 {
981 self.get_pool_data().best_transactions_with_attributes(best_transactions_attributes)
982 }
983
984 pub fn pending_transactions_max(
986 &self,
987 max: usize,
988 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
989 self.get_pool_data().pending_transactions_iter().take(max).collect()
990 }
991
992 pub fn pending_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
994 self.get_pool_data().pending_transactions()
995 }
996
997 pub fn queued_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
999 self.get_pool_data().queued_transactions()
1000 }
1001
1002 pub fn all_transactions(&self) -> AllPoolTransactions<T::Transaction> {
1004 let pool = self.get_pool_data();
1005 AllPoolTransactions {
1006 pending: pool.pending_transactions(),
1007 queued: pool.queued_transactions(),
1008 }
1009 }
1010
1011 pub fn all_transaction_hashes(&self) -> Vec<TxHash> {
1013 self.get_pool_data().all().transactions_iter().map(|tx| *tx.hash()).collect()
1014 }
1015
1016 pub fn remove_transactions(
1021 &self,
1022 hashes: Vec<TxHash>,
1023 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1024 if hashes.is_empty() {
1025 return Vec::new()
1026 }
1027 let removed = self.pool.write().remove_transactions(hashes);
1028
1029 self.with_event_listener(|listener| listener.discarded_many(&removed));
1030
1031 removed
1032 }
1033
1034 pub fn remove_transactions_and_descendants(
1037 &self,
1038 hashes: Vec<TxHash>,
1039 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1040 if hashes.is_empty() {
1041 return Vec::new()
1042 }
1043 let removed = self.pool.write().remove_transactions_and_descendants(hashes);
1044
1045 self.with_event_listener(|listener| {
1046 for tx in &removed {
1047 listener.discarded(tx.hash());
1048 }
1049 });
1050
1051 removed
1052 }
1053
1054 pub fn remove_transactions_by_sender(
1056 &self,
1057 sender: Address,
1058 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1059 let sender_id = self.get_sender_id(sender);
1060 let removed = self.pool.write().remove_transactions_by_sender(sender_id);
1061
1062 self.with_event_listener(|listener| listener.discarded_many(&removed));
1063
1064 removed
1065 }
1066
1067 pub fn retain_unknown<A>(&self, announcement: &mut A)
1069 where
1070 A: HandleMempoolData,
1071 {
1072 if announcement.is_empty() {
1073 return
1074 }
1075 let pool = self.get_pool_data();
1076 announcement.retain_by_hash(|tx| !pool.contains(tx))
1077 }
1078
1079 pub fn get(&self, tx_hash: &TxHash) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1081 self.get_pool_data().get(tx_hash)
1082 }
1083
1084 pub fn get_transactions_by_sender(
1086 &self,
1087 sender: Address,
1088 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1089 let sender_id = self.get_sender_id(sender);
1090 self.get_pool_data().get_transactions_by_sender(sender_id)
1091 }
1092
1093 pub fn get_queued_transactions_by_sender(
1095 &self,
1096 sender: Address,
1097 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1098 let sender_id = self.get_sender_id(sender);
1099 self.get_pool_data().queued_txs_by_sender(sender_id)
1100 }
1101
1102 pub fn pending_transactions_with_predicate(
1104 &self,
1105 predicate: impl FnMut(&ValidPoolTransaction<T::Transaction>) -> bool,
1106 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1107 self.get_pool_data().pending_transactions_with_predicate(predicate)
1108 }
1109
1110 pub fn get_pending_transactions_by_sender(
1112 &self,
1113 sender: Address,
1114 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1115 let sender_id = self.get_sender_id(sender);
1116 self.get_pool_data().pending_txs_by_sender(sender_id)
1117 }
1118
1119 pub fn get_highest_transaction_by_sender(
1121 &self,
1122 sender: Address,
1123 ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1124 let sender_id = self.get_sender_id(sender);
1125 self.get_pool_data().get_highest_transaction_by_sender(sender_id)
1126 }
1127
1128 pub fn get_highest_consecutive_transaction_by_sender(
1130 &self,
1131 sender: Address,
1132 on_chain_nonce: u64,
1133 ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1134 let sender_id = self.get_sender_id(sender);
1135 self.get_pool_data().get_highest_consecutive_transaction_by_sender(
1136 sender_id.into_transaction_id(on_chain_nonce),
1137 )
1138 }
1139
1140 pub fn get_transaction_by_transaction_id(
1142 &self,
1143 transaction_id: &TransactionId,
1144 ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1145 self.get_pool_data().all().get(transaction_id).map(|tx| tx.transaction.clone())
1146 }
1147
1148 pub fn get_transactions_by_origin(
1150 &self,
1151 origin: TransactionOrigin,
1152 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1153 self.get_pool_data()
1154 .all()
1155 .transactions_iter()
1156 .filter(|tx| tx.origin == origin)
1157 .cloned()
1158 .collect()
1159 }
1160
1161 pub fn get_pending_transactions_by_origin(
1163 &self,
1164 origin: TransactionOrigin,
1165 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1166 self.get_pool_data().pending_transactions_iter().filter(|tx| tx.origin == origin).collect()
1167 }
1168
1169 pub fn get_all(&self, txs: Vec<TxHash>) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1173 if txs.is_empty() {
1174 return Vec::new()
1175 }
1176 self.get_pool_data().get_all(txs).collect()
1177 }
1178
1179 fn get_all_propagatable(
1183 &self,
1184 txs: &[TxHash],
1185 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1186 if txs.is_empty() {
1187 return Vec::new()
1188 }
1189 let pool = self.get_pool_data();
1190 txs.iter().filter_map(|tx| pool.get(tx).filter(|tx| tx.propagate)).collect()
1191 }
1192
1193 pub fn on_propagated(&self, txs: PropagatedTransactions) {
1195 if txs.0.is_empty() {
1196 return
1197 }
1198 self.with_event_listener(|listener| {
1199 txs.0.into_iter().for_each(|(hash, peers)| listener.propagated(&hash, peers));
1200 });
1201 }
1202
1203 pub fn len(&self) -> usize {
1205 self.get_pool_data().len()
1206 }
1207
1208 pub fn is_empty(&self) -> bool {
1210 self.get_pool_data().is_empty()
1211 }
1212
1213 pub fn is_exceeded(&self) -> bool {
1215 self.pool.read().is_exceeded()
1216 }
1217
1218 fn insert_blob(&self, hash: TxHash, blob: BlobTransactionSidecarVariant) {
1220 debug!(target: "txpool", "[{:?}] storing blob sidecar", hash);
1221 if let Err(err) = self.blob_store.insert(hash, blob) {
1222 warn!(target: "txpool", %err, "[{:?}] failed to insert blob", hash);
1223 self.blob_store_metrics.blobstore_failed_inserts.increment(1);
1224 }
1225 self.update_blob_store_metrics();
1226 }
1227
1228 pub fn delete_blob(&self, blob: TxHash) {
1230 let _ = self.blob_store.delete(blob);
1231 }
1232
1233 pub fn delete_blobs(&self, txs: Vec<TxHash>) {
1235 let _ = self.blob_store.delete_all(txs);
1236 }
1237
1238 pub fn cleanup_blobs(&self) {
1240 let stat = self.blob_store.cleanup();
1241 self.blob_store_metrics.blobstore_failed_deletes.increment(stat.delete_failed as u64);
1242 self.update_blob_store_metrics();
1243 }
1244
1245 fn update_blob_store_metrics(&self) {
1246 if let Some(data_size) = self.blob_store.data_size_hint() {
1247 self.blob_store_metrics.blobstore_byte_size.set(data_size as f64);
1248 }
1249 self.blob_store_metrics.blobstore_entries.set(self.blob_store.blobs_len() as f64);
1250 }
1251
1252 fn delete_discarded_blobs<'a>(
1254 &'a self,
1255 transactions: impl IntoIterator<Item = &'a Arc<ValidPoolTransaction<T::Transaction>>>,
1256 ) {
1257 let blob_txs = transactions
1258 .into_iter()
1259 .filter(|tx| tx.transaction.is_eip4844())
1260 .map(|tx| *tx.hash())
1261 .collect();
1262 self.delete_blobs(blob_txs);
1263 }
1264}
1265
1266impl<V, T: TransactionOrdering, S> fmt::Debug for PoolInner<V, T, S> {
1267 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1268 f.debug_struct("PoolInner").field("config", &self.config).finish_non_exhaustive()
1269 }
1270}
1271
1272#[derive(Debug)]
1277struct AddedTransactionMeta<T: PoolTransaction> {
1278 added: AddedTransaction<T>,
1280 blob_sidecar: Option<BlobTransactionSidecarVariant>,
1282}
1283
1284#[derive(Debug, Clone)]
1286pub struct AddedPendingTransaction<T: PoolTransaction> {
1287 pub transaction: Arc<ValidPoolTransaction<T>>,
1289 pub replaced: Option<Arc<ValidPoolTransaction<T>>>,
1291 pub promoted: Vec<Arc<ValidPoolTransaction<T>>>,
1293 pub discarded: Vec<Arc<ValidPoolTransaction<T>>>,
1295}
1296
1297impl<T: PoolTransaction> AddedPendingTransaction<T> {
1298 pub(crate) fn pending_transactions(
1304 &self,
1305 kind: TransactionListenerKind,
1306 ) -> impl Iterator<Item = B256> + '_ {
1307 let iter = std::iter::once(&self.transaction).chain(self.promoted.iter());
1308 PendingTransactionIter { kind, iter }
1309 }
1310}
1311
1312pub(crate) struct PendingTransactionIter<Iter> {
1313 kind: TransactionListenerKind,
1314 iter: Iter,
1315}
1316
1317impl<'a, Iter, T> Iterator for PendingTransactionIter<Iter>
1318where
1319 Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1320 T: PoolTransaction + 'a,
1321{
1322 type Item = B256;
1323
1324 fn next(&mut self) -> Option<Self::Item> {
1325 loop {
1326 let next = self.iter.next()?;
1327 if self.kind.is_propagate_only() && !next.propagate {
1328 continue
1329 }
1330 return Some(*next.hash())
1331 }
1332 }
1333}
1334
1335pub(crate) struct FullPendingTransactionIter<Iter> {
1337 kind: TransactionListenerKind,
1338 iter: Iter,
1339}
1340
1341impl<'a, Iter, T> Iterator for FullPendingTransactionIter<Iter>
1342where
1343 Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1344 T: PoolTransaction + 'a,
1345{
1346 type Item = NewTransactionEvent<T>;
1347
1348 fn next(&mut self) -> Option<Self::Item> {
1349 loop {
1350 let next = self.iter.next()?;
1351 if self.kind.is_propagate_only() && !next.propagate {
1352 continue
1353 }
1354 return Some(NewTransactionEvent {
1355 subpool: SubPool::Pending,
1356 transaction: next.clone(),
1357 })
1358 }
1359 }
1360}
1361
1362#[derive(Debug, Clone)]
1364pub enum AddedTransaction<T: PoolTransaction> {
1365 Pending(AddedPendingTransaction<T>),
1367 Parked {
1370 transaction: Arc<ValidPoolTransaction<T>>,
1372 replaced: Option<Arc<ValidPoolTransaction<T>>>,
1374 subpool: SubPool,
1376 queued_reason: Option<QueuedReason>,
1378 },
1379}
1380
1381impl<T: PoolTransaction> AddedTransaction<T> {
1382 pub const fn as_pending(&self) -> Option<&AddedPendingTransaction<T>> {
1384 match self {
1385 Self::Pending(tx) => Some(tx),
1386 _ => None,
1387 }
1388 }
1389
1390 pub const fn replaced(&self) -> Option<&Arc<ValidPoolTransaction<T>>> {
1392 match self {
1393 Self::Pending(tx) => tx.replaced.as_ref(),
1394 Self::Parked { replaced, .. } => replaced.as_ref(),
1395 }
1396 }
1397
1398 pub(crate) fn discarded_transactions(&self) -> Option<&[Arc<ValidPoolTransaction<T>>]> {
1400 match self {
1401 Self::Pending(tx) => Some(&tx.discarded),
1402 Self::Parked { .. } => None,
1403 }
1404 }
1405
1406 pub(crate) fn replaced_blob_transaction(&self) -> Option<B256> {
1408 self.replaced().filter(|tx| tx.transaction.is_eip4844()).map(|tx| *tx.transaction.hash())
1409 }
1410
1411 pub fn hash(&self) -> &TxHash {
1413 match self {
1414 Self::Pending(tx) => tx.transaction.hash(),
1415 Self::Parked { transaction, .. } => transaction.hash(),
1416 }
1417 }
1418
1419 pub fn into_new_transaction_event(self) -> NewTransactionEvent<T> {
1421 match self {
1422 Self::Pending(tx) => {
1423 NewTransactionEvent { subpool: SubPool::Pending, transaction: tx.transaction }
1424 }
1425 Self::Parked { transaction, subpool, .. } => {
1426 NewTransactionEvent { transaction, subpool }
1427 }
1428 }
1429 }
1430
1431 pub(crate) const fn subpool(&self) -> SubPool {
1433 match self {
1434 Self::Pending(_) => SubPool::Pending,
1435 Self::Parked { subpool, .. } => *subpool,
1436 }
1437 }
1438
1439 #[cfg(test)]
1441 pub(crate) fn id(&self) -> &TransactionId {
1442 match self {
1443 Self::Pending(added) => added.transaction.id(),
1444 Self::Parked { transaction, .. } => transaction.id(),
1445 }
1446 }
1447
1448 pub const fn queued_reason(&self) -> Option<&QueuedReason> {
1450 match self {
1451 Self::Pending(_) => None,
1452 Self::Parked { queued_reason, .. } => queued_reason.as_ref(),
1453 }
1454 }
1455
1456 pub fn transaction_state(&self) -> AddedTransactionState {
1458 match self.subpool() {
1459 SubPool::Pending => AddedTransactionState::Pending,
1460 _ => {
1461 if let Some(reason) = self.queued_reason() {
1464 AddedTransactionState::Queued(reason.clone())
1465 } else {
1466 AddedTransactionState::Queued(QueuedReason::NonceGap)
1468 }
1469 }
1470 }
1471 }
1472}
1473
1474#[derive(Debug, Clone, PartialEq, Eq)]
1476pub enum QueuedReason {
1477 NonceGap,
1479 ParkedAncestors,
1481 InsufficientBalance,
1483 TooMuchGas,
1485 InsufficientBaseFee,
1487 InsufficientBlobFee,
1489}
1490
1491#[derive(Debug, Clone, PartialEq, Eq)]
1493pub enum AddedTransactionState {
1494 Pending,
1496 Queued(QueuedReason),
1498}
1499
1500impl AddedTransactionState {
1501 pub const fn is_queued(&self) -> bool {
1503 matches!(self, Self::Queued(_))
1504 }
1505
1506 pub const fn is_pending(&self) -> bool {
1508 matches!(self, Self::Pending)
1509 }
1510
1511 pub const fn queued_reason(&self) -> Option<&QueuedReason> {
1513 match self {
1514 Self::Queued(reason) => Some(reason),
1515 Self::Pending => None,
1516 }
1517 }
1518}
1519
1520#[derive(Debug, Clone, PartialEq, Eq)]
1522pub struct AddedTransactionOutcome {
1523 pub hash: TxHash,
1525 pub state: AddedTransactionState,
1527}
1528
1529impl AddedTransactionOutcome {
1530 pub const fn is_queued(&self) -> bool {
1532 self.state.is_queued()
1533 }
1534
1535 pub const fn is_pending(&self) -> bool {
1537 self.state.is_pending()
1538 }
1539}
1540
1541#[derive(Debug)]
1543pub(crate) struct OnNewCanonicalStateOutcome<T: PoolTransaction> {
1544 pub(crate) block_hash: B256,
1546 pub(crate) mined: Vec<TxHash>,
1548 pub(crate) promoted: Vec<Arc<ValidPoolTransaction<T>>>,
1550 pub(crate) discarded: Vec<Arc<ValidPoolTransaction<T>>>,
1552}
1553
1554impl<T: PoolTransaction> OnNewCanonicalStateOutcome<T> {
1555 pub(crate) fn pending_transactions(
1561 &self,
1562 kind: TransactionListenerKind,
1563 ) -> impl Iterator<Item = B256> + '_ {
1564 let iter = self.promoted.iter();
1565 PendingTransactionIter { kind, iter }
1566 }
1567
1568 pub(crate) fn full_pending_transactions(
1574 &self,
1575 kind: TransactionListenerKind,
1576 ) -> impl Iterator<Item = NewTransactionEvent<T>> + '_ {
1577 let iter = self.promoted.iter();
1578 FullPendingTransactionIter { kind, iter }
1579 }
1580}
1581
1582#[cfg(test)]
1583mod tests {
1584 use crate::{
1585 blobstore::{BlobStore, InMemoryBlobStore},
1586 identifier::SenderId,
1587 test_utils::{MockTransaction, TestPoolBuilder},
1588 validate::ValidTransaction,
1589 BlockInfo, PoolConfig, SubPoolLimit, TransactionOrigin, TransactionValidationOutcome, U256,
1590 };
1591 use alloy_eips::{eip4844::BlobTransactionSidecar, eip7594::BlobTransactionSidecarVariant};
1592 use alloy_primitives::Address;
1593 use std::{fs, path::PathBuf};
1594
1595 #[test]
1596 fn test_discard_blobs_on_blob_tx_eviction() {
1597 let blobs = {
1598 let json_content = fs::read_to_string(
1600 PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test_data/blob1.json"),
1601 )
1602 .expect("Failed to read the blob data file");
1603
1604 let json_value: serde_json::Value =
1606 serde_json::from_str(&json_content).expect("Failed to deserialize JSON");
1607
1608 vec![
1610 json_value
1612 .get("data")
1613 .unwrap()
1614 .as_str()
1615 .expect("Data is not a valid string")
1616 .to_string(),
1617 ]
1618 };
1619
1620 let sidecar = BlobTransactionSidecarVariant::Eip4844(
1622 BlobTransactionSidecar::try_from_blobs_hex(blobs).unwrap(),
1623 );
1624
1625 let blob_limit = SubPoolLimit::new(1000, usize::MAX);
1627
1628 let test_pool = &TestPoolBuilder::default()
1630 .with_config(PoolConfig { blob_limit, ..Default::default() })
1631 .pool;
1632
1633 test_pool
1635 .set_block_info(BlockInfo { pending_blob_fee: Some(10_000_000), ..Default::default() });
1636
1637 let blob_store = InMemoryBlobStore::default();
1639
1640 for n in 0..blob_limit.max_txs + 10 {
1642 let mut tx = MockTransaction::eip4844_with_sidecar(sidecar.clone());
1644
1645 tx.set_size(1844674407370951);
1647
1648 if n < blob_limit.max_txs {
1650 blob_store.insert(*tx.get_hash(), sidecar.clone()).unwrap();
1651 }
1652
1653 test_pool.add_transactions(
1655 TransactionOrigin::External,
1656 [TransactionValidationOutcome::Valid {
1657 balance: U256::from(1_000),
1658 state_nonce: 0,
1659 bytecode_hash: None,
1660 transaction: ValidTransaction::ValidWithSidecar {
1661 transaction: tx,
1662 sidecar: sidecar.clone(),
1663 },
1664 propagate: true,
1665 authorities: None,
1666 }],
1667 );
1668 }
1669
1670 assert_eq!(test_pool.size().blob, blob_limit.max_txs);
1672
1673 assert_eq!(test_pool.size().blob_size, 1844674407370951000);
1675
1676 assert_eq!(*test_pool.blob_store(), blob_store);
1678 }
1679
1680 #[test]
1681 fn test_auths_stored_in_identifiers() {
1682 let test_pool = &TestPoolBuilder::default().with_config(Default::default()).pool;
1684
1685 let auth = Address::new([1; 20]);
1686 let tx = MockTransaction::eip7702();
1687
1688 test_pool.add_transactions(
1689 TransactionOrigin::Local,
1690 [TransactionValidationOutcome::Valid {
1691 balance: U256::from(1_000),
1692 state_nonce: 0,
1693 bytecode_hash: None,
1694 transaction: ValidTransaction::Valid(tx),
1695 propagate: true,
1696 authorities: Some(vec![auth]),
1697 }],
1698 );
1699
1700 let identifiers = test_pool.identifiers.read();
1701 assert_eq!(identifiers.sender_id(&auth), Some(SenderId::from(1)));
1702 }
1703}