1use crate::{
69 blobstore::BlobStore,
70 error::{PoolError, PoolErrorKind, PoolResult},
71 identifier::{SenderId, SenderIdentifiers, TransactionId},
72 metrics::BlobStoreMetrics,
73 pool::{
74 listener::{
75 BlobTransactionSidecarListener, PendingTransactionHashListener, PoolEventBroadcast,
76 TransactionListener,
77 },
78 state::SubPool,
79 txpool::{SenderInfo, TxPool},
80 update::UpdateOutcome,
81 },
82 traits::{
83 AllPoolTransactions, BestTransactionsAttributes, BlockInfo, GetPooledTransactionLimit,
84 NewBlobSidecar, PoolSize, PoolTransaction, PropagatedTransactions, TransactionOrigin,
85 },
86 validate::{TransactionValidationOutcome, ValidPoolTransaction, ValidTransaction},
87 CanonicalStateUpdate, EthPoolTransaction, PoolConfig, TransactionOrdering,
88 TransactionValidator,
89};
90
91use alloy_primitives::{
92 map::{AddressSet, HashSet},
93 Address, TxHash, B256,
94};
95use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
96use reth_eth_wire_types::HandleMempoolData;
97use reth_execution_types::ChangedAccount;
98
99use alloy_eips::{eip7594::BlobTransactionSidecarVariant, Typed2718};
100use reth_primitives_traits::Recovered;
101use rustc_hash::FxHashMap;
102use std::{
103 fmt,
104 sync::{
105 atomic::{AtomicBool, Ordering},
106 Arc,
107 },
108 time::Instant,
109};
110use tokio::sync::mpsc;
111use tracing::{debug, trace, warn};
112mod events;
113pub use best::{BestTransactionFilter, BestTransactionsWithPrioritizedSenders};
114pub use blob::{blob_tx_priority, fee_delta, BlobOrd, BlobTransactions};
115pub use events::{FullTransactionEvent, NewTransactionEvent, TransactionEvent};
116pub use listener::{AllTransactionsEvents, TransactionEvents, TransactionListenerKind};
117pub use parked::{BasefeeOrd, ParkedOrd, ParkedPool, QueuedOrd};
118pub use pending::PendingPool;
119
120mod best;
121pub use best::BestTransactions;
122
123mod blob;
124pub mod listener;
125mod parked;
126pub mod pending;
127pub mod size;
128pub(crate) mod state;
129pub mod txpool;
130mod update;
131
132pub const PENDING_TX_LISTENER_BUFFER_SIZE: usize = 2048;
134pub const NEW_TX_LISTENER_BUFFER_SIZE: usize = 1024;
136
137const BLOB_SIDECAR_LISTENER_BUFFER_SIZE: usize = 512;
138
139pub struct PoolInner<V, T, S>
141where
142 T: TransactionOrdering,
143{
144 identifiers: RwLock<SenderIdentifiers>,
146 validator: V,
148 blob_store: S,
150 pool: RwLock<TxPool<T>>,
152 config: PoolConfig,
154 event_listener: RwLock<PoolEventBroadcast<T::Transaction>>,
156 has_event_listeners: AtomicBool,
158 pending_transaction_listener: RwLock<Vec<PendingTransactionHashListener>>,
160 transaction_listener: RwLock<Vec<TransactionListener<T::Transaction>>>,
162 blob_transaction_sidecar_listener: Mutex<Vec<BlobTransactionSidecarListener>>,
164 blob_store_metrics: BlobStoreMetrics,
166}
167
168impl<V, T, S> PoolInner<V, T, S>
171where
172 V: TransactionValidator,
173 T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
174 S: BlobStore,
175{
176 pub fn new(validator: V, ordering: T, blob_store: S, config: PoolConfig) -> Self {
178 Self {
179 identifiers: Default::default(),
180 validator,
181 event_listener: Default::default(),
182 has_event_listeners: AtomicBool::new(false),
183 pool: RwLock::new(TxPool::new(ordering, config.clone())),
184 pending_transaction_listener: Default::default(),
185 transaction_listener: Default::default(),
186 blob_transaction_sidecar_listener: Default::default(),
187 config,
188 blob_store,
189 blob_store_metrics: Default::default(),
190 }
191 }
192
193 pub const fn blob_store(&self) -> &S {
195 &self.blob_store
196 }
197
198 pub fn size(&self) -> PoolSize {
200 self.get_pool_data().size()
201 }
202
203 pub fn block_info(&self) -> BlockInfo {
205 self.get_pool_data().block_info()
206 }
207 pub fn set_block_info(&self, info: BlockInfo) {
209 self.pool.write().set_block_info(info)
210 }
211
212 pub fn get_sender_id(&self, addr: Address) -> SenderId {
214 self.identifiers.write().sender_id_or_create(addr)
215 }
216
217 pub fn get_sender_ids(&self, addrs: impl IntoIterator<Item = Address>) -> Vec<SenderId> {
219 self.identifiers.write().sender_ids_or_create(addrs)
220 }
221
222 pub fn unique_senders(&self) -> AddressSet {
224 self.get_pool_data().unique_senders()
225 }
226
227 fn changed_senders(
230 &self,
231 accs: impl Iterator<Item = ChangedAccount>,
232 ) -> FxHashMap<SenderId, SenderInfo> {
233 let identifiers = self.identifiers.read();
234 accs.into_iter()
235 .filter_map(|acc| {
236 let ChangedAccount { address, nonce, balance } = acc;
237 let sender_id = identifiers.sender_id(&address)?;
238 Some((sender_id, SenderInfo { state_nonce: nonce, balance }))
239 })
240 .collect()
241 }
242
243 pub const fn config(&self) -> &PoolConfig {
245 &self.config
246 }
247
248 pub const fn validator(&self) -> &V {
250 &self.validator
251 }
252
253 pub fn add_pending_listener(&self, kind: TransactionListenerKind) -> mpsc::Receiver<TxHash> {
256 let (sender, rx) = mpsc::channel(self.config.pending_tx_listener_buffer_size);
257 let listener = PendingTransactionHashListener { sender, kind };
258
259 let mut listeners = self.pending_transaction_listener.write();
260 listeners.retain(|l| !l.sender.is_closed());
262 listeners.push(listener);
263
264 rx
265 }
266
267 pub fn add_new_transaction_listener(
269 &self,
270 kind: TransactionListenerKind,
271 ) -> mpsc::Receiver<NewTransactionEvent<T::Transaction>> {
272 let (sender, rx) = mpsc::channel(self.config.new_tx_listener_buffer_size);
273 let listener = TransactionListener { sender, kind };
274
275 let mut listeners = self.transaction_listener.write();
276 listeners.retain(|l| !l.sender.is_closed());
278 listeners.push(listener);
279
280 rx
281 }
282 pub fn add_blob_sidecar_listener(&self) -> mpsc::Receiver<NewBlobSidecar> {
285 let (sender, rx) = mpsc::channel(BLOB_SIDECAR_LISTENER_BUFFER_SIZE);
286 let listener = BlobTransactionSidecarListener { sender };
287 self.blob_transaction_sidecar_listener.lock().push(listener);
288 rx
289 }
290
291 pub fn add_transaction_event_listener(&self, tx_hash: TxHash) -> Option<TransactionEvents> {
294 if !self.get_pool_data().contains(&tx_hash) {
295 return None
296 }
297 let mut listener = self.event_listener.write();
298 let events = listener.subscribe(tx_hash);
299 self.mark_event_listener_installed();
300 Some(events)
301 }
302
303 pub fn add_all_transactions_event_listener(&self) -> AllTransactionsEvents<T::Transaction> {
305 let mut listener = self.event_listener.write();
306 let events = listener.subscribe_all();
307 self.mark_event_listener_installed();
308 events
309 }
310
311 #[inline]
312 fn has_event_listeners(&self) -> bool {
313 self.has_event_listeners.load(Ordering::Relaxed)
314 }
315
316 #[inline]
317 fn mark_event_listener_installed(&self) {
318 self.has_event_listeners.store(true, Ordering::Relaxed);
319 }
320
321 #[inline]
322 fn update_event_listener_state(&self, listener: &PoolEventBroadcast<T::Transaction>) {
323 if listener.is_empty() {
324 self.has_event_listeners.store(false, Ordering::Relaxed);
325 }
326 }
327
328 #[inline]
329 fn with_event_listener<F>(&self, emit: F)
330 where
331 F: FnOnce(&mut PoolEventBroadcast<T::Transaction>),
332 {
333 if !self.has_event_listeners() {
334 return
335 }
336 let mut listener = self.event_listener.write();
337 if !listener.is_empty() {
338 emit(&mut listener);
339 }
340 self.update_event_listener_state(&listener);
341 }
342
343 pub fn get_pool_data(&self) -> RwLockReadGuard<'_, TxPool<T>> {
345 self.pool.read()
346 }
347
348 pub fn pooled_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
350 let mut out = Vec::new();
351 self.append_pooled_transactions(&mut out);
352 out
353 }
354
355 pub fn pooled_transactions_hashes(&self) -> Vec<TxHash> {
357 let mut out = Vec::new();
358 self.append_pooled_transactions_hashes(&mut out);
359 out
360 }
361
362 pub fn pooled_transactions_max(
364 &self,
365 max: usize,
366 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
367 let mut out = Vec::new();
368 self.append_pooled_transactions_max(max, &mut out);
369 out
370 }
371
372 pub fn append_pooled_transactions(
374 &self,
375 out: &mut Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
376 ) {
377 out.extend(
378 self.get_pool_data().all().transactions_iter().filter(|tx| tx.propagate).cloned(),
379 );
380 }
381
382 pub fn append_pooled_transaction_elements(
385 &self,
386 tx_hashes: &[TxHash],
387 limit: GetPooledTransactionLimit,
388 out: &mut Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>,
389 ) where
390 <V as TransactionValidator>::Transaction: EthPoolTransaction,
391 {
392 let transactions = self.get_all_propagatable(tx_hashes);
393 let mut size = 0;
394 for transaction in transactions {
395 let encoded_len = transaction.encoded_length();
396 let Some(pooled) = self.to_pooled_transaction(transaction) else {
397 continue;
398 };
399
400 size += encoded_len;
401 out.push(pooled.into_inner());
402
403 if limit.exceeds(size) {
404 break
405 }
406 }
407 }
408
409 pub fn append_pooled_transactions_hashes(&self, out: &mut Vec<TxHash>) {
412 out.extend(
413 self.get_pool_data()
414 .all()
415 .transactions_iter()
416 .filter(|tx| tx.propagate)
417 .map(|tx| *tx.hash()),
418 );
419 }
420
421 pub fn append_pooled_transactions_max(
424 &self,
425 max: usize,
426 out: &mut Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
427 ) {
428 out.extend(
429 self.get_pool_data()
430 .all()
431 .transactions_iter()
432 .filter(|tx| tx.propagate)
433 .take(max)
434 .cloned(),
435 );
436 }
437
438 pub fn pooled_transactions_hashes_max(&self, max: usize) -> Vec<TxHash> {
440 if max == 0 {
441 return Vec::new();
442 }
443 self.get_pool_data()
444 .all()
445 .transactions_iter()
446 .filter(|tx| tx.propagate)
447 .take(max)
448 .map(|tx| *tx.hash())
449 .collect()
450 }
451
452 fn to_pooled_transaction(
457 &self,
458 transaction: Arc<ValidPoolTransaction<T::Transaction>>,
459 ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
460 where
461 <V as TransactionValidator>::Transaction: EthPoolTransaction,
462 {
463 if transaction.is_eip4844() {
464 let sidecar = self.blob_store.get(*transaction.hash()).ok()??;
465 transaction.transaction.clone().try_into_pooled_eip4844(sidecar)
466 } else {
467 transaction
468 .transaction
469 .clone_into_pooled()
470 .inspect_err(|err| {
471 debug!(
472 target: "txpool", %err,
473 "failed to convert transaction to pooled element; skipping",
474 );
475 })
476 .ok()
477 }
478 }
479
480 pub fn get_pooled_transaction_elements(
483 &self,
484 tx_hashes: Vec<TxHash>,
485 limit: GetPooledTransactionLimit,
486 ) -> Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>
487 where
488 <V as TransactionValidator>::Transaction: EthPoolTransaction,
489 {
490 let mut elements = Vec::new();
491 self.append_pooled_transaction_elements(&tx_hashes, limit, &mut elements);
492 elements.shrink_to_fit();
493 elements
494 }
495
496 pub fn get_pooled_transaction_element(
498 &self,
499 tx_hash: TxHash,
500 ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
501 where
502 <V as TransactionValidator>::Transaction: EthPoolTransaction,
503 {
504 self.get(&tx_hash).and_then(|tx| self.to_pooled_transaction(tx))
505 }
506
507 pub fn on_canonical_state_change(&self, update: CanonicalStateUpdate<'_, V::Block>) {
509 trace!(target: "txpool", ?update, "updating pool on canonical state change");
510
511 let block_info = update.block_info();
512 let CanonicalStateUpdate {
513 new_tip, changed_accounts, mined_transactions, update_kind, ..
514 } = update;
515 self.validator.on_new_head_block(new_tip);
516
517 let changed_senders = self.changed_senders(changed_accounts.into_iter());
518
519 let outcome = self.pool.write().on_canonical_state_change(
521 block_info,
522 mined_transactions,
523 changed_senders,
524 update_kind,
525 );
526
527 self.delete_discarded_blobs(outcome.discarded.iter());
529
530 self.notify_on_new_state(outcome);
532 }
533
534 pub fn update_accounts(&self, accounts: Vec<ChangedAccount>) {
540 let changed_senders = self.changed_senders(accounts.into_iter());
541 let UpdateOutcome { promoted, discarded } =
542 self.pool.write().update_accounts(changed_senders);
543
544 self.notify_on_transaction_updates(promoted, discarded);
545 }
546
547 fn add_transaction(
555 &self,
556 pool: &mut RwLockWriteGuard<'_, TxPool<T>>,
557 origin: TransactionOrigin,
558 tx: TransactionValidationOutcome<T::Transaction>,
559 ) -> (PoolResult<AddedTransactionOutcome>, Option<AddedTransactionMeta<T::Transaction>>) {
560 match tx {
561 TransactionValidationOutcome::Valid {
562 balance,
563 state_nonce,
564 transaction,
565 propagate,
566 bytecode_hash,
567 authorities,
568 } => {
569 let sender_id = self.get_sender_id(transaction.sender());
570 let transaction_id = TransactionId::new(sender_id, transaction.nonce());
571
572 let (transaction, blob_sidecar) = match transaction {
574 ValidTransaction::Valid(tx) => (tx, None),
575 ValidTransaction::ValidWithSidecar { transaction, sidecar } => {
576 debug_assert!(
577 transaction.is_eip4844(),
578 "validator returned sidecar for non EIP-4844 transaction"
579 );
580 (transaction, Some(sidecar))
581 }
582 };
583
584 let tx = ValidPoolTransaction {
585 transaction,
586 transaction_id,
587 propagate,
588 timestamp: Instant::now(),
589 origin,
590 authority_ids: authorities.map(|auths| self.get_sender_ids(auths)),
591 };
592
593 let added = match pool.add_transaction(tx, balance, state_nonce, bytecode_hash) {
594 Ok(added) => added,
595 Err(err) => return (Err(err), None),
596 };
597 let hash = *added.hash();
598 let state = added.transaction_state();
599
600 let meta = AddedTransactionMeta { added, blob_sidecar };
601
602 (Ok(AddedTransactionOutcome { hash, state }), Some(meta))
603 }
604 TransactionValidationOutcome::Invalid(tx, err) => {
605 self.with_event_listener(|listener| listener.invalid(tx.hash()));
606 (Err(PoolError::new(*tx.hash(), err)), None)
607 }
608 TransactionValidationOutcome::Error(tx_hash, err) => {
609 self.with_event_listener(|listener| listener.discarded(&tx_hash));
610 (Err(PoolError::other(tx_hash, err)), None)
611 }
612 }
613 }
614
615 pub fn add_transaction_and_subscribe(
617 &self,
618 origin: TransactionOrigin,
619 tx: TransactionValidationOutcome<T::Transaction>,
620 ) -> PoolResult<TransactionEvents> {
621 let listener = {
622 let mut listener = self.event_listener.write();
623 let events = listener.subscribe(tx.tx_hash());
624 self.mark_event_listener_installed();
625 events
626 };
627 let mut results = self.add_transactions(origin, std::iter::once(tx));
628 results.pop().expect("result length is the same as the input")?;
629 Ok(listener)
630 }
631
632 pub fn add_transactions(
637 &self,
638 origin: TransactionOrigin,
639 transactions: impl IntoIterator<Item = TransactionValidationOutcome<T::Transaction>>,
640 ) -> Vec<PoolResult<AddedTransactionOutcome>> {
641 self.add_transactions_with_origins(transactions.into_iter().map(|tx| (origin, tx)))
642 }
643
644 pub fn add_transactions_with_origins(
647 &self,
648 transactions: impl IntoIterator<
649 Item = (TransactionOrigin, TransactionValidationOutcome<T::Transaction>),
650 >,
651 ) -> Vec<PoolResult<AddedTransactionOutcome>> {
652 let (mut results, added_metas, discarded) = {
654 let mut pool = self.pool.write();
655 let mut added_metas = Vec::new();
656
657 let results = transactions
658 .into_iter()
659 .map(|(origin, tx)| {
660 let (result, meta) = self.add_transaction(&mut pool, origin, tx);
661
662 if result.is_ok() &&
664 let Some(meta) = meta
665 {
666 added_metas.push(meta);
667 }
668
669 result
670 })
671 .collect::<Vec<_>>();
672
673 let discarded = if results.iter().any(Result::is_ok) {
675 pool.discard_worst()
676 } else {
677 Default::default()
678 };
679
680 (results, added_metas, discarded)
681 };
682
683 for meta in added_metas {
684 self.on_added_transaction(meta);
685 }
686
687 if !discarded.is_empty() {
688 self.delete_discarded_blobs(discarded.iter());
690 self.with_event_listener(|listener| listener.discarded_many(&discarded));
691
692 let discarded_hashes =
693 discarded.into_iter().map(|tx| *tx.hash()).collect::<HashSet<_>>();
694
695 for res in &mut results {
698 if let Ok(AddedTransactionOutcome { hash, .. }) = res &&
699 discarded_hashes.contains(hash)
700 {
701 *res = Err(PoolError::new(*hash, PoolErrorKind::DiscardedOnInsert))
702 }
703 }
704 };
705
706 results
707 }
708
709 fn on_added_transaction(&self, meta: AddedTransactionMeta<T::Transaction>) {
714 if let Some(sidecar) = meta.blob_sidecar {
716 let hash = *meta.added.hash();
717 self.on_new_blob_sidecar(&hash, &sidecar);
718 self.insert_blob(hash, sidecar);
719 }
720
721 if let Some(replaced) = meta.added.replaced_blob_transaction() {
723 debug!(target: "txpool", "[{:?}] delete replaced blob sidecar", replaced);
724 self.delete_blob(replaced);
725 }
726
727 if let Some(discarded) = meta.added.discarded_transactions() {
729 self.delete_discarded_blobs(discarded.iter());
730 }
731
732 if let Some(pending) = meta.added.as_pending() {
734 self.on_new_pending_transaction(pending);
735 }
736
737 self.notify_event_listeners(&meta.added);
739
740 self.on_new_transaction(meta.added.into_new_transaction_event());
742 }
743
744 pub fn on_new_pending_transaction(&self, pending: &AddedPendingTransaction<T::Transaction>) {
753 let mut needs_cleanup = false;
754
755 {
756 let listeners = self.pending_transaction_listener.read();
757 for listener in listeners.iter() {
758 if !listener.send_all(pending.pending_transactions(listener.kind)) {
759 needs_cleanup = true;
760 }
761 }
762 }
763
764 if needs_cleanup {
766 self.pending_transaction_listener
767 .write()
768 .retain(|listener| !listener.sender.is_closed());
769 }
770 }
771
772 pub fn on_new_transaction(&self, event: NewTransactionEvent<T::Transaction>) {
781 let mut needs_cleanup = false;
782
783 {
784 let listeners = self.transaction_listener.read();
785 for listener in listeners.iter() {
786 if listener.kind.is_propagate_only() && !event.transaction.propagate {
787 if listener.sender.is_closed() {
788 needs_cleanup = true;
789 }
790 continue
792 }
793
794 if !listener.send(event.clone()) {
795 needs_cleanup = true;
796 }
797 }
798 }
799
800 if needs_cleanup {
802 self.transaction_listener.write().retain(|listener| !listener.sender.is_closed());
803 }
804 }
805
806 fn on_new_blob_sidecar(&self, tx_hash: &TxHash, sidecar: &BlobTransactionSidecarVariant) {
808 let mut sidecar_listeners = self.blob_transaction_sidecar_listener.lock();
809 if sidecar_listeners.is_empty() {
810 return
811 }
812 let sidecar = Arc::new(sidecar.clone());
813 sidecar_listeners.retain_mut(|listener| {
814 let new_blob_event = NewBlobSidecar { tx_hash: *tx_hash, sidecar: sidecar.clone() };
815 match listener.sender.try_send(new_blob_event) {
816 Ok(()) => true,
817 Err(err) => {
818 if matches!(err, mpsc::error::TrySendError::Full(_)) {
819 debug!(
820 target: "txpool",
821 "[{:?}] failed to send blob sidecar; channel full",
822 sidecar,
823 );
824 true
825 } else {
826 false
827 }
828 }
829 }
830 })
831 }
832
833 fn notify_on_new_state(&self, outcome: OnNewCanonicalStateOutcome<T::Transaction>) {
835 trace!(target: "txpool", promoted=outcome.promoted.len(), discarded= outcome.discarded.len() ,"notifying listeners on state change");
836
837 let mut needs_pending_cleanup = false;
839 {
840 let listeners = self.pending_transaction_listener.read();
841 for listener in listeners.iter() {
842 if !listener.send_all(outcome.pending_transactions(listener.kind)) {
843 needs_pending_cleanup = true;
844 }
845 }
846 }
847 if needs_pending_cleanup {
848 self.pending_transaction_listener.write().retain(|l| !l.sender.is_closed());
849 }
850
851 let mut needs_tx_cleanup = false;
853 {
854 let listeners = self.transaction_listener.read();
855 for listener in listeners.iter() {
856 if !listener.send_all(outcome.full_pending_transactions(listener.kind)) {
857 needs_tx_cleanup = true;
858 }
859 }
860 }
861 if needs_tx_cleanup {
862 self.transaction_listener.write().retain(|l| !l.sender.is_closed());
863 }
864
865 let OnNewCanonicalStateOutcome { mined, promoted, discarded, block_hash } = outcome;
866
867 self.with_event_listener(|listener| {
869 for tx in &mined {
870 listener.mined(tx, block_hash);
871 }
872 for tx in &promoted {
873 listener.pending(tx.hash(), None);
874 }
875 for tx in &discarded {
876 listener.discarded(tx.hash());
877 }
878 })
879 }
880
881 #[allow(clippy::type_complexity)]
890 pub fn notify_on_transaction_updates(
891 &self,
892 promoted: Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
893 discarded: Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
894 ) {
895 if !promoted.is_empty() {
897 let mut needs_pending_cleanup = false;
898 {
899 let listeners = self.pending_transaction_listener.read();
900 for listener in listeners.iter() {
901 let promoted_hashes = promoted.iter().filter_map(|tx| {
902 if listener.kind.is_propagate_only() && !tx.propagate {
903 None
904 } else {
905 Some(*tx.hash())
906 }
907 });
908 if !listener.send_all(promoted_hashes) {
909 needs_pending_cleanup = true;
910 }
911 }
912 }
913 if needs_pending_cleanup {
914 self.pending_transaction_listener.write().retain(|l| !l.sender.is_closed());
915 }
916
917 let mut needs_tx_cleanup = false;
919 {
920 let listeners = self.transaction_listener.read();
921 for listener in listeners.iter() {
922 let promoted_txs = promoted.iter().filter_map(|tx| {
923 if listener.kind.is_propagate_only() && !tx.propagate {
924 None
925 } else {
926 Some(NewTransactionEvent::pending(tx.clone()))
927 }
928 });
929 if !listener.send_all(promoted_txs) {
930 needs_tx_cleanup = true;
931 }
932 }
933 }
934 if needs_tx_cleanup {
935 self.transaction_listener.write().retain(|l| !l.sender.is_closed());
936 }
937 }
938
939 self.with_event_listener(|listener| {
940 for tx in &promoted {
941 listener.pending(tx.hash(), None);
942 }
943 for tx in &discarded {
944 listener.discarded(tx.hash());
945 }
946 });
947
948 if !discarded.is_empty() {
949 self.delete_discarded_blobs(discarded.iter());
952 }
953 }
954
955 pub fn notify_event_listeners(&self, tx: &AddedTransaction<T::Transaction>) {
964 self.with_event_listener(|listener| match tx {
965 AddedTransaction::Pending(tx) => {
966 let AddedPendingTransaction { transaction, promoted, discarded, replaced } = tx;
967
968 listener.pending(transaction.hash(), replaced.clone());
969 for tx in promoted {
970 listener.pending(tx.hash(), None);
971 }
972 for tx in discarded {
973 listener.discarded(tx.hash());
974 }
975 }
976 AddedTransaction::Parked { transaction, replaced, queued_reason, .. } => {
977 listener.queued(transaction.hash(), queued_reason.clone());
978 if let Some(replaced) = replaced {
979 listener.replaced(replaced.clone(), *transaction.hash());
980 }
981 }
982 });
983 }
984
985 pub fn best_transactions(&self) -> BestTransactions<T> {
987 self.get_pool_data().best_transactions()
988 }
989
990 pub fn best_transactions_with_attributes(
993 &self,
994 best_transactions_attributes: BestTransactionsAttributes,
995 ) -> Box<dyn crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T::Transaction>>>>
996 {
997 self.get_pool_data().best_transactions_with_attributes(best_transactions_attributes)
998 }
999
1000 pub fn pending_transactions_max(
1002 &self,
1003 max: usize,
1004 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1005 self.get_pool_data().pending_transactions_iter().take(max).collect()
1006 }
1007
1008 pub fn pending_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1010 self.get_pool_data().pending_transactions()
1011 }
1012
1013 pub fn queued_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1015 self.get_pool_data().queued_transactions()
1016 }
1017
1018 pub fn all_transactions(&self) -> AllPoolTransactions<T::Transaction> {
1020 let pool = self.get_pool_data();
1021 AllPoolTransactions {
1022 pending: pool.pending_transactions(),
1023 queued: pool.queued_transactions(),
1024 }
1025 }
1026
1027 pub fn all_transaction_hashes(&self) -> Vec<TxHash> {
1029 self.get_pool_data().all().transactions_iter().map(|tx| *tx.hash()).collect()
1030 }
1031
1032 pub fn remove_transactions(
1037 &self,
1038 hashes: Vec<TxHash>,
1039 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1040 if hashes.is_empty() {
1041 return Vec::new()
1042 }
1043 let removed = self.pool.write().remove_transactions(hashes);
1044
1045 self.with_event_listener(|listener| listener.discarded_many(&removed));
1046
1047 removed
1048 }
1049
1050 pub fn remove_transactions_and_descendants(
1053 &self,
1054 hashes: Vec<TxHash>,
1055 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1056 if hashes.is_empty() {
1057 return Vec::new()
1058 }
1059 let removed = self.pool.write().remove_transactions_and_descendants(hashes);
1060
1061 self.with_event_listener(|listener| {
1062 for tx in &removed {
1063 listener.discarded(tx.hash());
1064 }
1065 });
1066
1067 removed
1068 }
1069
1070 pub fn remove_transactions_by_sender(
1072 &self,
1073 sender: Address,
1074 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1075 let sender_id = self.get_sender_id(sender);
1076 let removed = self.pool.write().remove_transactions_by_sender(sender_id);
1077
1078 self.with_event_listener(|listener| listener.discarded_many(&removed));
1079
1080 removed
1081 }
1082
1083 pub fn prune_transactions(
1088 &self,
1089 hashes: Vec<TxHash>,
1090 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1091 if hashes.is_empty() {
1092 return Vec::new()
1093 }
1094
1095 self.pool.write().prune_transactions(hashes)
1096 }
1097
1098 pub fn retain_unknown<A>(&self, announcement: &mut A)
1100 where
1101 A: HandleMempoolData,
1102 {
1103 if announcement.is_empty() {
1104 return
1105 }
1106 let pool = self.get_pool_data();
1107 announcement.retain_by_hash(|tx| !pool.contains(tx))
1108 }
1109
1110 pub fn get(&self, tx_hash: &TxHash) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1112 self.get_pool_data().get(tx_hash)
1113 }
1114
1115 pub fn get_transactions_by_sender(
1117 &self,
1118 sender: Address,
1119 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1120 let sender_id = self.get_sender_id(sender);
1121 self.get_pool_data().get_transactions_by_sender(sender_id)
1122 }
1123
1124 pub fn get_pending_transaction_by_sender_and_nonce(
1126 &self,
1127 sender: Address,
1128 nonce: u64,
1129 ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1130 let sender_id = self.get_sender_id(sender);
1131 self.get_pool_data().get_pending_transaction_by_sender_and_nonce(sender_id, nonce)
1132 }
1133
1134 pub fn get_queued_transactions_by_sender(
1136 &self,
1137 sender: Address,
1138 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1139 let sender_id = self.get_sender_id(sender);
1140 self.get_pool_data().queued_txs_by_sender(sender_id)
1141 }
1142
1143 pub fn pending_transactions_with_predicate(
1145 &self,
1146 predicate: impl FnMut(&ValidPoolTransaction<T::Transaction>) -> bool,
1147 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1148 self.get_pool_data().pending_transactions_with_predicate(predicate)
1149 }
1150
1151 pub fn get_pending_transactions_by_sender(
1153 &self,
1154 sender: Address,
1155 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1156 let sender_id = self.get_sender_id(sender);
1157 self.get_pool_data().pending_txs_by_sender(sender_id)
1158 }
1159
1160 pub fn get_highest_transaction_by_sender(
1162 &self,
1163 sender: Address,
1164 ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1165 let sender_id = self.get_sender_id(sender);
1166 self.get_pool_data().get_highest_transaction_by_sender(sender_id)
1167 }
1168
1169 pub fn get_highest_consecutive_transaction_by_sender(
1171 &self,
1172 sender: Address,
1173 on_chain_nonce: u64,
1174 ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1175 let sender_id = self.get_sender_id(sender);
1176 self.get_pool_data().get_highest_consecutive_transaction_by_sender(
1177 sender_id.into_transaction_id(on_chain_nonce),
1178 )
1179 }
1180
1181 pub fn get_transaction_by_transaction_id(
1183 &self,
1184 transaction_id: &TransactionId,
1185 ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1186 self.get_pool_data().all().get(transaction_id).map(|tx| tx.transaction.clone())
1187 }
1188
1189 pub fn get_transactions_by_origin(
1191 &self,
1192 origin: TransactionOrigin,
1193 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1194 self.get_pool_data()
1195 .all()
1196 .transactions_iter()
1197 .filter(|tx| tx.origin == origin)
1198 .cloned()
1199 .collect()
1200 }
1201
1202 pub fn get_pending_transactions_by_origin(
1204 &self,
1205 origin: TransactionOrigin,
1206 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1207 self.get_pool_data().pending_transactions_iter().filter(|tx| tx.origin == origin).collect()
1208 }
1209
1210 pub fn get_all(&self, txs: Vec<TxHash>) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1214 if txs.is_empty() {
1215 return Vec::new()
1216 }
1217 self.get_pool_data().get_all(txs).collect()
1218 }
1219
1220 fn get_all_propagatable(
1224 &self,
1225 txs: &[TxHash],
1226 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1227 if txs.is_empty() {
1228 return Vec::new()
1229 }
1230 let pool = self.get_pool_data();
1231 txs.iter().filter_map(|tx| pool.get(tx).filter(|tx| tx.propagate)).collect()
1232 }
1233
1234 pub fn on_propagated(&self, txs: PropagatedTransactions) {
1236 if txs.0.is_empty() {
1237 return
1238 }
1239 self.with_event_listener(|listener| {
1240 txs.0.into_iter().for_each(|(hash, peers)| listener.propagated(&hash, peers));
1241 });
1242 }
1243
1244 pub fn len(&self) -> usize {
1246 self.get_pool_data().len()
1247 }
1248
1249 pub fn is_empty(&self) -> bool {
1251 self.get_pool_data().is_empty()
1252 }
1253
1254 pub fn is_exceeded(&self) -> bool {
1256 self.pool.read().is_exceeded()
1257 }
1258
1259 fn insert_blob(&self, hash: TxHash, blob: BlobTransactionSidecarVariant) {
1261 debug!(target: "txpool", "[{:?}] storing blob sidecar", hash);
1262 if let Err(err) = self.blob_store.insert(hash, blob) {
1263 warn!(target: "txpool", %err, "[{:?}] failed to insert blob", hash);
1264 self.blob_store_metrics.blobstore_failed_inserts.increment(1);
1265 }
1266 self.update_blob_store_metrics();
1267 }
1268
1269 pub fn delete_blob(&self, blob: TxHash) {
1271 let _ = self.blob_store.delete(blob);
1272 }
1273
1274 pub fn delete_blobs(&self, txs: Vec<TxHash>) {
1276 let _ = self.blob_store.delete_all(txs);
1277 }
1278
1279 pub fn cleanup_blobs(&self) {
1281 let stat = self.blob_store.cleanup();
1282 self.blob_store_metrics.blobstore_failed_deletes.increment(stat.delete_failed as u64);
1283 self.update_blob_store_metrics();
1284 }
1285
1286 fn update_blob_store_metrics(&self) {
1287 if let Some(data_size) = self.blob_store.data_size_hint() {
1288 self.blob_store_metrics.blobstore_byte_size.set(data_size as f64);
1289 }
1290 self.blob_store_metrics.blobstore_entries.set(self.blob_store.blobs_len() as f64);
1291 }
1292
1293 fn delete_discarded_blobs<'a>(
1295 &'a self,
1296 transactions: impl IntoIterator<Item = &'a Arc<ValidPoolTransaction<T::Transaction>>>,
1297 ) {
1298 let blob_txs = transactions
1299 .into_iter()
1300 .filter(|tx| tx.transaction.is_eip4844())
1301 .map(|tx| *tx.hash())
1302 .collect();
1303 self.delete_blobs(blob_txs);
1304 }
1305}
1306
1307impl<V, T: TransactionOrdering, S> fmt::Debug for PoolInner<V, T, S> {
1308 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1309 f.debug_struct("PoolInner").field("config", &self.config).finish_non_exhaustive()
1310 }
1311}
1312
1313#[derive(Debug)]
1318struct AddedTransactionMeta<T: PoolTransaction> {
1319 added: AddedTransaction<T>,
1321 blob_sidecar: Option<BlobTransactionSidecarVariant>,
1323}
1324
1325#[derive(Debug, Clone)]
1327pub struct AddedPendingTransaction<T: PoolTransaction> {
1328 pub transaction: Arc<ValidPoolTransaction<T>>,
1330 pub replaced: Option<Arc<ValidPoolTransaction<T>>>,
1332 pub promoted: Vec<Arc<ValidPoolTransaction<T>>>,
1334 pub discarded: Vec<Arc<ValidPoolTransaction<T>>>,
1336}
1337
1338impl<T: PoolTransaction> AddedPendingTransaction<T> {
1339 pub(crate) fn pending_transactions(
1345 &self,
1346 kind: TransactionListenerKind,
1347 ) -> impl Iterator<Item = B256> + '_ {
1348 let iter = std::iter::once(&self.transaction).chain(self.promoted.iter());
1349 PendingTransactionIter { kind, iter }
1350 }
1351}
1352
1353pub(crate) struct PendingTransactionIter<Iter> {
1354 kind: TransactionListenerKind,
1355 iter: Iter,
1356}
1357
1358impl<'a, Iter, T> Iterator for PendingTransactionIter<Iter>
1359where
1360 Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1361 T: PoolTransaction + 'a,
1362{
1363 type Item = B256;
1364
1365 fn next(&mut self) -> Option<Self::Item> {
1366 loop {
1367 let next = self.iter.next()?;
1368 if self.kind.is_propagate_only() && !next.propagate {
1369 continue
1370 }
1371 return Some(*next.hash())
1372 }
1373 }
1374}
1375
1376pub(crate) struct FullPendingTransactionIter<Iter> {
1378 kind: TransactionListenerKind,
1379 iter: Iter,
1380}
1381
1382impl<'a, Iter, T> Iterator for FullPendingTransactionIter<Iter>
1383where
1384 Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1385 T: PoolTransaction + 'a,
1386{
1387 type Item = NewTransactionEvent<T>;
1388
1389 fn next(&mut self) -> Option<Self::Item> {
1390 loop {
1391 let next = self.iter.next()?;
1392 if self.kind.is_propagate_only() && !next.propagate {
1393 continue
1394 }
1395 return Some(NewTransactionEvent {
1396 subpool: SubPool::Pending,
1397 transaction: next.clone(),
1398 })
1399 }
1400 }
1401}
1402
1403#[derive(Debug, Clone)]
1405pub enum AddedTransaction<T: PoolTransaction> {
1406 Pending(AddedPendingTransaction<T>),
1408 Parked {
1411 transaction: Arc<ValidPoolTransaction<T>>,
1413 replaced: Option<Arc<ValidPoolTransaction<T>>>,
1415 subpool: SubPool,
1417 queued_reason: Option<QueuedReason>,
1419 },
1420}
1421
1422impl<T: PoolTransaction> AddedTransaction<T> {
1423 pub const fn as_pending(&self) -> Option<&AddedPendingTransaction<T>> {
1425 match self {
1426 Self::Pending(tx) => Some(tx),
1427 _ => None,
1428 }
1429 }
1430
1431 pub const fn replaced(&self) -> Option<&Arc<ValidPoolTransaction<T>>> {
1433 match self {
1434 Self::Pending(tx) => tx.replaced.as_ref(),
1435 Self::Parked { replaced, .. } => replaced.as_ref(),
1436 }
1437 }
1438
1439 pub(crate) fn discarded_transactions(&self) -> Option<&[Arc<ValidPoolTransaction<T>>]> {
1441 match self {
1442 Self::Pending(tx) => Some(&tx.discarded),
1443 Self::Parked { .. } => None,
1444 }
1445 }
1446
1447 pub(crate) fn replaced_blob_transaction(&self) -> Option<B256> {
1449 self.replaced().filter(|tx| tx.transaction.is_eip4844()).map(|tx| *tx.transaction.hash())
1450 }
1451
1452 pub fn hash(&self) -> &TxHash {
1454 match self {
1455 Self::Pending(tx) => tx.transaction.hash(),
1456 Self::Parked { transaction, .. } => transaction.hash(),
1457 }
1458 }
1459
1460 pub fn into_new_transaction_event(self) -> NewTransactionEvent<T> {
1462 match self {
1463 Self::Pending(tx) => {
1464 NewTransactionEvent { subpool: SubPool::Pending, transaction: tx.transaction }
1465 }
1466 Self::Parked { transaction, subpool, .. } => {
1467 NewTransactionEvent { transaction, subpool }
1468 }
1469 }
1470 }
1471
1472 pub(crate) const fn subpool(&self) -> SubPool {
1474 match self {
1475 Self::Pending(_) => SubPool::Pending,
1476 Self::Parked { subpool, .. } => *subpool,
1477 }
1478 }
1479
1480 #[cfg(test)]
1482 pub(crate) fn id(&self) -> &TransactionId {
1483 match self {
1484 Self::Pending(added) => added.transaction.id(),
1485 Self::Parked { transaction, .. } => transaction.id(),
1486 }
1487 }
1488
1489 pub const fn queued_reason(&self) -> Option<&QueuedReason> {
1491 match self {
1492 Self::Pending(_) => None,
1493 Self::Parked { queued_reason, .. } => queued_reason.as_ref(),
1494 }
1495 }
1496
1497 pub fn transaction_state(&self) -> AddedTransactionState {
1499 match self.subpool() {
1500 SubPool::Pending => AddedTransactionState::Pending,
1501 _ => {
1502 if let Some(reason) = self.queued_reason() {
1505 AddedTransactionState::Queued(reason.clone())
1506 } else {
1507 AddedTransactionState::Queued(QueuedReason::NonceGap)
1509 }
1510 }
1511 }
1512 }
1513}
1514
1515#[derive(Debug, Clone, PartialEq, Eq)]
1517pub enum QueuedReason {
1518 NonceGap,
1520 ParkedAncestors,
1522 InsufficientBalance,
1524 TooMuchGas,
1526 InsufficientBaseFee,
1528 InsufficientBlobFee,
1530}
1531
1532#[derive(Debug, Clone, PartialEq, Eq)]
1534pub enum AddedTransactionState {
1535 Pending,
1537 Queued(QueuedReason),
1539}
1540
1541impl AddedTransactionState {
1542 pub const fn is_queued(&self) -> bool {
1544 matches!(self, Self::Queued(_))
1545 }
1546
1547 pub const fn is_pending(&self) -> bool {
1549 matches!(self, Self::Pending)
1550 }
1551
1552 pub const fn queued_reason(&self) -> Option<&QueuedReason> {
1554 match self {
1555 Self::Queued(reason) => Some(reason),
1556 Self::Pending => None,
1557 }
1558 }
1559}
1560
1561#[derive(Debug, Clone, PartialEq, Eq)]
1563pub struct AddedTransactionOutcome {
1564 pub hash: TxHash,
1566 pub state: AddedTransactionState,
1568}
1569
1570impl AddedTransactionOutcome {
1571 pub const fn is_queued(&self) -> bool {
1573 self.state.is_queued()
1574 }
1575
1576 pub const fn is_pending(&self) -> bool {
1578 self.state.is_pending()
1579 }
1580}
1581
1582#[derive(Debug)]
1584pub(crate) struct OnNewCanonicalStateOutcome<T: PoolTransaction> {
1585 pub(crate) block_hash: B256,
1587 pub(crate) mined: Vec<TxHash>,
1589 pub(crate) promoted: Vec<Arc<ValidPoolTransaction<T>>>,
1591 pub(crate) discarded: Vec<Arc<ValidPoolTransaction<T>>>,
1593}
1594
1595impl<T: PoolTransaction> OnNewCanonicalStateOutcome<T> {
1596 pub(crate) fn pending_transactions(
1602 &self,
1603 kind: TransactionListenerKind,
1604 ) -> impl Iterator<Item = B256> + '_ {
1605 let iter = self.promoted.iter();
1606 PendingTransactionIter { kind, iter }
1607 }
1608
1609 pub(crate) fn full_pending_transactions(
1615 &self,
1616 kind: TransactionListenerKind,
1617 ) -> impl Iterator<Item = NewTransactionEvent<T>> + '_ {
1618 let iter = self.promoted.iter();
1619 FullPendingTransactionIter { kind, iter }
1620 }
1621}
1622
1623#[cfg(test)]
1624mod tests {
1625 use crate::{
1626 blobstore::{BlobStore, InMemoryBlobStore},
1627 identifier::SenderId,
1628 test_utils::{MockTransaction, TestPoolBuilder},
1629 validate::ValidTransaction,
1630 BlockInfo, PoolConfig, SubPoolLimit, TransactionOrigin, TransactionValidationOutcome, U256,
1631 };
1632 use alloy_eips::{eip4844::BlobTransactionSidecar, eip7594::BlobTransactionSidecarVariant};
1633 use alloy_primitives::Address;
1634 use std::{fs, path::PathBuf};
1635
1636 #[test]
1637 fn test_discard_blobs_on_blob_tx_eviction() {
1638 let blobs = {
1639 let json_content = fs::read_to_string(
1641 PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test_data/blob1.json"),
1642 )
1643 .expect("Failed to read the blob data file");
1644
1645 let json_value: serde_json::Value =
1647 serde_json::from_str(&json_content).expect("Failed to deserialize JSON");
1648
1649 vec![
1651 json_value
1653 .get("data")
1654 .unwrap()
1655 .as_str()
1656 .expect("Data is not a valid string")
1657 .to_string(),
1658 ]
1659 };
1660
1661 let sidecar = BlobTransactionSidecarVariant::Eip4844(
1663 BlobTransactionSidecar::try_from_blobs_hex(blobs).unwrap(),
1664 );
1665
1666 let blob_limit = SubPoolLimit::new(1000, usize::MAX);
1668
1669 let test_pool = &TestPoolBuilder::default()
1671 .with_config(PoolConfig { blob_limit, ..Default::default() })
1672 .pool;
1673
1674 test_pool
1676 .set_block_info(BlockInfo { pending_blob_fee: Some(10_000_000), ..Default::default() });
1677
1678 let blob_store = InMemoryBlobStore::default();
1680
1681 for n in 0..blob_limit.max_txs + 10 {
1683 let mut tx = MockTransaction::eip4844_with_sidecar(sidecar.clone());
1685
1686 tx.set_size(1844674407370951);
1688
1689 if n < blob_limit.max_txs {
1691 blob_store.insert(*tx.get_hash(), sidecar.clone()).unwrap();
1692 }
1693
1694 test_pool.add_transactions(
1696 TransactionOrigin::External,
1697 [TransactionValidationOutcome::Valid {
1698 balance: U256::from(1_000),
1699 state_nonce: 0,
1700 bytecode_hash: None,
1701 transaction: ValidTransaction::ValidWithSidecar {
1702 transaction: tx,
1703 sidecar: sidecar.clone(),
1704 },
1705 propagate: true,
1706 authorities: None,
1707 }],
1708 );
1709 }
1710
1711 assert_eq!(test_pool.size().blob, blob_limit.max_txs);
1713
1714 assert_eq!(test_pool.size().blob_size, 1844674407370951000);
1716
1717 assert_eq!(*test_pool.blob_store(), blob_store);
1719 }
1720
1721 #[test]
1722 fn test_auths_stored_in_identifiers() {
1723 let test_pool = &TestPoolBuilder::default().with_config(Default::default()).pool;
1725
1726 let auth = Address::new([1; 20]);
1727 let tx = MockTransaction::eip7702();
1728
1729 test_pool.add_transactions(
1730 TransactionOrigin::Local,
1731 [TransactionValidationOutcome::Valid {
1732 balance: U256::from(1_000),
1733 state_nonce: 0,
1734 bytecode_hash: None,
1735 transaction: ValidTransaction::Valid(tx),
1736 propagate: true,
1737 authorities: Some(vec![auth]),
1738 }],
1739 );
1740
1741 let identifiers = test_pool.identifiers.read();
1742 assert_eq!(identifiers.sender_id(&auth), Some(SenderId::from(1)));
1743 }
1744}