1use crate::{
69 blobstore::BlobStore,
70 error::{PoolError, PoolErrorKind, PoolResult},
71 identifier::{SenderId, SenderIdentifiers, TransactionId},
72 metrics::BlobStoreMetrics,
73 pool::{
74 listener::{
75 BlobTransactionSidecarListener, PendingTransactionHashListener, PoolEventBroadcast,
76 TransactionListener,
77 },
78 state::SubPool,
79 txpool::{SenderInfo, TxPool},
80 update::UpdateOutcome,
81 },
82 traits::{
83 AllPoolTransactions, BestTransactionsAttributes, BlockInfo, GetPooledTransactionLimit,
84 NewBlobSidecar, PoolSize, PoolTransaction, PropagatedTransactions, TransactionOrigin,
85 },
86 validate::{TransactionValidationOutcome, ValidPoolTransaction, ValidTransaction},
87 CanonicalStateUpdate, EthPoolTransaction, PoolConfig, TransactionOrdering,
88 TransactionValidator,
89};
90
91use alloy_primitives::{Address, TxHash, B256};
92use best::BestTransactions;
93use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
94use reth_eth_wire_types::HandleMempoolData;
95use reth_execution_types::ChangedAccount;
96
97use alloy_eips::{eip7594::BlobTransactionSidecarVariant, Typed2718};
98use reth_primitives_traits::Recovered;
99use rustc_hash::FxHashMap;
100use std::{collections::HashSet, fmt, sync::Arc, time::Instant};
101use tokio::sync::mpsc;
102use tracing::{debug, trace, warn};
103mod events;
104pub use best::{BestTransactionFilter, BestTransactionsWithPrioritizedSenders};
105pub use blob::{blob_tx_priority, fee_delta, BlobOrd, BlobTransactions};
106pub use events::{FullTransactionEvent, NewTransactionEvent, TransactionEvent};
107pub use listener::{AllTransactionsEvents, TransactionEvents, TransactionListenerKind};
108pub use parked::{BasefeeOrd, ParkedOrd, ParkedPool, QueuedOrd};
109pub use pending::PendingPool;
110use reth_primitives_traits::Block;
111
112mod best;
113mod blob;
114mod listener;
115mod parked;
116pub mod pending;
117pub(crate) mod size;
118pub(crate) mod state;
119pub mod txpool;
120mod update;
121
122pub const PENDING_TX_LISTENER_BUFFER_SIZE: usize = 2048;
124pub const NEW_TX_LISTENER_BUFFER_SIZE: usize = 1024;
126
127const BLOB_SIDECAR_LISTENER_BUFFER_SIZE: usize = 512;
128
129pub struct PoolInner<V, T, S>
131where
132 T: TransactionOrdering,
133{
134 identifiers: RwLock<SenderIdentifiers>,
136 validator: V,
138 blob_store: S,
140 pool: RwLock<TxPool<T>>,
142 config: PoolConfig,
144 event_listener: RwLock<PoolEventBroadcast<T::Transaction>>,
146 pending_transaction_listener: Mutex<Vec<PendingTransactionHashListener>>,
148 transaction_listener: Mutex<Vec<TransactionListener<T::Transaction>>>,
150 blob_transaction_sidecar_listener: Mutex<Vec<BlobTransactionSidecarListener>>,
152 blob_store_metrics: BlobStoreMetrics,
154}
155
156impl<V, T, S> PoolInner<V, T, S>
159where
160 V: TransactionValidator,
161 T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
162 S: BlobStore,
163{
164 pub fn new(validator: V, ordering: T, blob_store: S, config: PoolConfig) -> Self {
166 Self {
167 identifiers: Default::default(),
168 validator,
169 event_listener: Default::default(),
170 pool: RwLock::new(TxPool::new(ordering, config.clone())),
171 pending_transaction_listener: Default::default(),
172 transaction_listener: Default::default(),
173 blob_transaction_sidecar_listener: Default::default(),
174 config,
175 blob_store,
176 blob_store_metrics: Default::default(),
177 }
178 }
179
180 pub const fn blob_store(&self) -> &S {
182 &self.blob_store
183 }
184
185 pub fn size(&self) -> PoolSize {
187 self.get_pool_data().size()
188 }
189
190 pub fn block_info(&self) -> BlockInfo {
192 self.get_pool_data().block_info()
193 }
194 pub fn set_block_info(&self, info: BlockInfo) {
196 self.pool.write().set_block_info(info)
197 }
198
199 pub fn get_sender_id(&self, addr: Address) -> SenderId {
201 self.identifiers.write().sender_id_or_create(addr)
202 }
203
204 pub fn get_sender_ids(&self, addrs: impl IntoIterator<Item = Address>) -> Vec<SenderId> {
206 self.identifiers.write().sender_ids_or_create(addrs)
207 }
208
209 pub fn unique_senders(&self) -> HashSet<Address> {
211 self.get_pool_data().unique_senders()
212 }
213
214 fn changed_senders(
217 &self,
218 accs: impl Iterator<Item = ChangedAccount>,
219 ) -> FxHashMap<SenderId, SenderInfo> {
220 let mut identifiers = self.identifiers.write();
221 accs.into_iter()
222 .map(|acc| {
223 let ChangedAccount { address, nonce, balance } = acc;
224 let sender_id = identifiers.sender_id_or_create(address);
225 (sender_id, SenderInfo { state_nonce: nonce, balance })
226 })
227 .collect()
228 }
229
230 pub const fn config(&self) -> &PoolConfig {
232 &self.config
233 }
234
235 pub const fn validator(&self) -> &V {
237 &self.validator
238 }
239
240 pub fn add_pending_listener(&self, kind: TransactionListenerKind) -> mpsc::Receiver<TxHash> {
243 let (sender, rx) = mpsc::channel(self.config.pending_tx_listener_buffer_size);
244 let listener = PendingTransactionHashListener { sender, kind };
245 self.pending_transaction_listener.lock().push(listener);
246 rx
247 }
248
249 pub fn add_new_transaction_listener(
251 &self,
252 kind: TransactionListenerKind,
253 ) -> mpsc::Receiver<NewTransactionEvent<T::Transaction>> {
254 let (sender, rx) = mpsc::channel(self.config.new_tx_listener_buffer_size);
255 let listener = TransactionListener { sender, kind };
256 self.transaction_listener.lock().push(listener);
257 rx
258 }
259 pub fn add_blob_sidecar_listener(&self) -> mpsc::Receiver<NewBlobSidecar> {
262 let (sender, rx) = mpsc::channel(BLOB_SIDECAR_LISTENER_BUFFER_SIZE);
263 let listener = BlobTransactionSidecarListener { sender };
264 self.blob_transaction_sidecar_listener.lock().push(listener);
265 rx
266 }
267
268 pub fn add_transaction_event_listener(&self, tx_hash: TxHash) -> Option<TransactionEvents> {
271 self.get_pool_data()
272 .contains(&tx_hash)
273 .then(|| self.event_listener.write().subscribe(tx_hash))
274 }
275
276 pub fn add_all_transactions_event_listener(&self) -> AllTransactionsEvents<T::Transaction> {
278 self.event_listener.write().subscribe_all()
279 }
280
281 pub fn get_pool_data(&self) -> RwLockReadGuard<'_, TxPool<T>> {
283 self.pool.read()
284 }
285
286 pub fn pooled_transactions_hashes(&self) -> Vec<TxHash> {
288 self.get_pool_data()
289 .all()
290 .transactions_iter()
291 .filter(|tx| tx.propagate)
292 .map(|tx| *tx.hash())
293 .collect()
294 }
295
296 pub fn pooled_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
298 self.get_pool_data().all().transactions_iter().filter(|tx| tx.propagate).cloned().collect()
299 }
300
301 pub fn pooled_transactions_max(
303 &self,
304 max: usize,
305 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
306 self.get_pool_data()
307 .all()
308 .transactions_iter()
309 .filter(|tx| tx.propagate)
310 .take(max)
311 .cloned()
312 .collect()
313 }
314
315 fn to_pooled_transaction(
320 &self,
321 transaction: Arc<ValidPoolTransaction<T::Transaction>>,
322 ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
323 where
324 <V as TransactionValidator>::Transaction: EthPoolTransaction,
325 {
326 if transaction.is_eip4844() {
327 let sidecar = self.blob_store.get(*transaction.hash()).ok()??;
328 transaction.transaction.clone().try_into_pooled_eip4844(sidecar)
329 } else {
330 transaction
331 .transaction
332 .clone()
333 .try_into_pooled()
334 .inspect_err(|err| {
335 debug!(
336 target: "txpool", %err,
337 "failed to convert transaction to pooled element; skipping",
338 );
339 })
340 .ok()
341 }
342 }
343
344 pub fn get_pooled_transaction_elements(
347 &self,
348 tx_hashes: Vec<TxHash>,
349 limit: GetPooledTransactionLimit,
350 ) -> Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>
351 where
352 <V as TransactionValidator>::Transaction: EthPoolTransaction,
353 {
354 let transactions = self.get_all_propagatable(tx_hashes);
355 let mut elements = Vec::with_capacity(transactions.len());
356 let mut size = 0;
357 for transaction in transactions {
358 let encoded_len = transaction.encoded_length();
359 let Some(pooled) = self.to_pooled_transaction(transaction) else {
360 continue;
361 };
362
363 size += encoded_len;
364 elements.push(pooled.into_inner());
365
366 if limit.exceeds(size) {
367 break
368 }
369 }
370
371 elements
372 }
373
374 pub fn get_pooled_transaction_element(
376 &self,
377 tx_hash: TxHash,
378 ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
379 where
380 <V as TransactionValidator>::Transaction: EthPoolTransaction,
381 {
382 self.get(&tx_hash).and_then(|tx| self.to_pooled_transaction(tx))
383 }
384
385 pub fn on_canonical_state_change<B>(&self, update: CanonicalStateUpdate<'_, B>)
387 where
388 B: Block,
389 {
390 trace!(target: "txpool", ?update, "updating pool on canonical state change");
391
392 let block_info = update.block_info();
393 let CanonicalStateUpdate {
394 new_tip, changed_accounts, mined_transactions, update_kind, ..
395 } = update;
396 self.validator.on_new_head_block(new_tip);
397
398 let changed_senders = self.changed_senders(changed_accounts.into_iter());
399
400 let outcome = self.pool.write().on_canonical_state_change(
402 block_info,
403 mined_transactions,
404 changed_senders,
405 update_kind,
406 );
407
408 self.delete_discarded_blobs(outcome.discarded.iter());
410
411 self.notify_on_new_state(outcome);
413 }
414
415 pub fn update_accounts(&self, accounts: Vec<ChangedAccount>) {
421 let changed_senders = self.changed_senders(accounts.into_iter());
422 let UpdateOutcome { promoted, discarded } =
423 self.pool.write().update_accounts(changed_senders);
424
425 if !promoted.is_empty() {
427 self.pending_transaction_listener.lock().retain_mut(|listener| {
428 let promoted_hashes = promoted.iter().filter_map(|tx| {
429 if listener.kind.is_propagate_only() && !tx.propagate {
430 None
431 } else {
432 Some(*tx.hash())
433 }
434 });
435 listener.send_all(promoted_hashes)
436 });
437
438 self.transaction_listener.lock().retain_mut(|listener| {
440 let promoted_txs = promoted.iter().filter_map(|tx| {
441 if listener.kind.is_propagate_only() && !tx.propagate {
442 None
443 } else {
444 Some(NewTransactionEvent::pending(tx.clone()))
445 }
446 });
447 listener.send_all(promoted_txs)
448 });
449 }
450
451 {
452 let mut listener = self.event_listener.write();
453 if !listener.is_empty() {
454 for tx in &promoted {
455 listener.pending(tx.hash(), None);
456 }
457 for tx in &discarded {
458 listener.discarded(tx.hash());
459 }
460 }
461 }
462
463 self.delete_discarded_blobs(discarded.iter());
466 }
467
468 fn add_transaction(
473 &self,
474 pool: &mut RwLockWriteGuard<'_, TxPool<T>>,
475 origin: TransactionOrigin,
476 tx: TransactionValidationOutcome<T::Transaction>,
477 ) -> PoolResult<AddedTransactionOutcome> {
478 match tx {
479 TransactionValidationOutcome::Valid {
480 balance,
481 state_nonce,
482 transaction,
483 propagate,
484 bytecode_hash,
485 authorities,
486 } => {
487 let sender_id = self.get_sender_id(transaction.sender());
488 let transaction_id = TransactionId::new(sender_id, transaction.nonce());
489
490 let (transaction, maybe_sidecar) = match transaction {
492 ValidTransaction::Valid(tx) => (tx, None),
493 ValidTransaction::ValidWithSidecar { transaction, sidecar } => {
494 debug_assert!(
495 transaction.is_eip4844(),
496 "validator returned sidecar for non EIP-4844 transaction"
497 );
498 (transaction, Some(sidecar))
499 }
500 };
501
502 let tx = ValidPoolTransaction {
503 transaction,
504 transaction_id,
505 propagate,
506 timestamp: Instant::now(),
507 origin,
508 authority_ids: authorities.map(|auths| self.get_sender_ids(auths)),
509 };
510
511 let added = pool.add_transaction(tx, balance, state_nonce, bytecode_hash)?;
512 let hash = *added.hash();
513 let state = added.transaction_state();
514
515 if let Some(sidecar) = maybe_sidecar {
517 self.on_new_blob_sidecar(&hash, &sidecar);
519 self.insert_blob(hash, sidecar);
521 }
522
523 if let Some(replaced) = added.replaced_blob_transaction() {
524 debug!(target: "txpool", "[{:?}] delete replaced blob sidecar", replaced);
525 self.delete_blob(replaced);
527 }
528
529 if let Some(pending) = added.as_pending() {
531 self.on_new_pending_transaction(pending);
532 }
533
534 self.notify_event_listeners(&added);
536
537 if let Some(discarded) = added.discarded_transactions() {
538 self.delete_discarded_blobs(discarded.iter());
539 }
540
541 self.on_new_transaction(added.into_new_transaction_event());
543
544 Ok(AddedTransactionOutcome { hash, state })
545 }
546 TransactionValidationOutcome::Invalid(tx, err) => {
547 let mut listener = self.event_listener.write();
548 listener.invalid(tx.hash());
549 Err(PoolError::new(*tx.hash(), err))
550 }
551 TransactionValidationOutcome::Error(tx_hash, err) => {
552 let mut listener = self.event_listener.write();
553 listener.discarded(&tx_hash);
554 Err(PoolError::other(tx_hash, err))
555 }
556 }
557 }
558
559 pub fn add_transaction_and_subscribe(
561 &self,
562 origin: TransactionOrigin,
563 tx: TransactionValidationOutcome<T::Transaction>,
564 ) -> PoolResult<TransactionEvents> {
565 let listener = {
566 let mut listener = self.event_listener.write();
567 listener.subscribe(tx.tx_hash())
568 };
569 let mut results = self.add_transactions(origin, std::iter::once(tx));
570 results.pop().expect("result length is the same as the input")?;
571 Ok(listener)
572 }
573
574 pub fn add_transactions_with_origins(
581 &self,
582 transactions: impl IntoIterator<
583 Item = (TransactionOrigin, TransactionValidationOutcome<T::Transaction>),
584 >,
585 ) -> Vec<PoolResult<AddedTransactionOutcome>> {
586 let (mut added, discarded) = {
588 let mut pool = self.pool.write();
589 let added = transactions
590 .into_iter()
591 .map(|(origin, tx)| self.add_transaction(&mut pool, origin, tx))
592 .collect::<Vec<_>>();
593
594 let discarded = if added.iter().any(Result::is_ok) {
596 pool.discard_worst()
597 } else {
598 Default::default()
599 };
600
601 (added, discarded)
602 };
603
604 if !discarded.is_empty() {
605 self.delete_discarded_blobs(discarded.iter());
607 self.event_listener.write().discarded_many(&discarded);
608
609 let discarded_hashes =
610 discarded.into_iter().map(|tx| *tx.hash()).collect::<HashSet<_>>();
611
612 for res in &mut added {
615 if let Ok(AddedTransactionOutcome { hash, .. }) = res {
616 if discarded_hashes.contains(hash) {
617 *res = Err(PoolError::new(*hash, PoolErrorKind::DiscardedOnInsert))
618 }
619 }
620 }
621 }
622
623 added
624 }
625
626 pub fn add_transactions(
632 &self,
633 origin: TransactionOrigin,
634 transactions: impl IntoIterator<Item = TransactionValidationOutcome<T::Transaction>>,
635 ) -> Vec<PoolResult<AddedTransactionOutcome>> {
636 self.add_transactions_with_origins(transactions.into_iter().map(|tx| (origin, tx)))
637 }
638
639 fn on_new_pending_transaction(&self, pending: &AddedPendingTransaction<T::Transaction>) {
641 let propagate_allowed = pending.is_propagate_allowed();
642
643 let mut transaction_listeners = self.pending_transaction_listener.lock();
644 transaction_listeners.retain_mut(|listener| {
645 if listener.kind.is_propagate_only() && !propagate_allowed {
646 return !listener.sender.is_closed()
649 }
650
651 listener.send_all(pending.pending_transactions(listener.kind))
653 });
654 }
655
656 fn on_new_transaction(&self, event: NewTransactionEvent<T::Transaction>) {
658 let mut transaction_listeners = self.transaction_listener.lock();
659 transaction_listeners.retain_mut(|listener| {
660 if listener.kind.is_propagate_only() && !event.transaction.propagate {
661 return !listener.sender.is_closed()
664 }
665
666 listener.send(event.clone())
667 });
668 }
669
670 fn on_new_blob_sidecar(&self, tx_hash: &TxHash, sidecar: &BlobTransactionSidecarVariant) {
672 let mut sidecar_listeners = self.blob_transaction_sidecar_listener.lock();
673 if sidecar_listeners.is_empty() {
674 return
675 }
676 let sidecar = Arc::new(sidecar.clone());
677 sidecar_listeners.retain_mut(|listener| {
678 let new_blob_event = NewBlobSidecar { tx_hash: *tx_hash, sidecar: sidecar.clone() };
679 match listener.sender.try_send(new_blob_event) {
680 Ok(()) => true,
681 Err(err) => {
682 if matches!(err, mpsc::error::TrySendError::Full(_)) {
683 debug!(
684 target: "txpool",
685 "[{:?}] failed to send blob sidecar; channel full",
686 sidecar,
687 );
688 true
689 } else {
690 false
691 }
692 }
693 }
694 })
695 }
696
697 fn notify_on_new_state(&self, outcome: OnNewCanonicalStateOutcome<T::Transaction>) {
699 trace!(target: "txpool", promoted=outcome.promoted.len(), discarded= outcome.discarded.len() ,"notifying listeners on state change");
700
701 self.pending_transaction_listener
704 .lock()
705 .retain_mut(|listener| listener.send_all(outcome.pending_transactions(listener.kind)));
706
707 self.transaction_listener.lock().retain_mut(|listener| {
709 listener.send_all(outcome.full_pending_transactions(listener.kind))
710 });
711
712 let OnNewCanonicalStateOutcome { mined, promoted, discarded, block_hash } = outcome;
713
714 let mut listener = self.event_listener.write();
716
717 if !listener.is_empty() {
718 for tx in &mined {
719 listener.mined(tx, block_hash);
720 }
721 for tx in &promoted {
722 listener.pending(tx.hash(), None);
723 }
724 for tx in &discarded {
725 listener.discarded(tx.hash());
726 }
727 }
728 }
729
730 fn notify_event_listeners(&self, tx: &AddedTransaction<T::Transaction>) {
732 let mut listener = self.event_listener.write();
733 if listener.is_empty() {
734 return
736 }
737
738 match tx {
739 AddedTransaction::Pending(tx) => {
740 let AddedPendingTransaction { transaction, promoted, discarded, replaced } = tx;
741
742 listener.pending(transaction.hash(), replaced.clone());
743 for tx in promoted {
744 listener.pending(tx.hash(), None);
745 }
746 for tx in discarded {
747 listener.discarded(tx.hash());
748 }
749 }
750 AddedTransaction::Parked { transaction, replaced, .. } => {
751 listener.queued(transaction.hash());
752 if let Some(replaced) = replaced {
753 listener.replaced(replaced.clone(), *transaction.hash());
754 }
755 }
756 }
757 }
758
759 pub fn best_transactions(&self) -> BestTransactions<T> {
761 self.get_pool_data().best_transactions()
762 }
763
764 pub fn best_transactions_with_attributes(
767 &self,
768 best_transactions_attributes: BestTransactionsAttributes,
769 ) -> Box<dyn crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T::Transaction>>>>
770 {
771 self.get_pool_data().best_transactions_with_attributes(best_transactions_attributes)
772 }
773
774 pub fn pending_transactions_max(
776 &self,
777 max: usize,
778 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
779 self.get_pool_data().pending_transactions_iter().take(max).collect()
780 }
781
782 pub fn pending_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
784 self.get_pool_data().pending_transactions()
785 }
786
787 pub fn queued_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
789 self.get_pool_data().queued_transactions()
790 }
791
792 pub fn all_transactions(&self) -> AllPoolTransactions<T::Transaction> {
794 let pool = self.get_pool_data();
795 AllPoolTransactions {
796 pending: pool.pending_transactions(),
797 queued: pool.queued_transactions(),
798 }
799 }
800
801 pub fn all_transaction_hashes(&self) -> Vec<TxHash> {
803 self.get_pool_data().all().transactions_iter().map(|tx| *tx.hash()).collect()
804 }
805
806 pub fn remove_transactions(
811 &self,
812 hashes: Vec<TxHash>,
813 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
814 if hashes.is_empty() {
815 return Vec::new()
816 }
817 let removed = self.pool.write().remove_transactions(hashes);
818
819 self.event_listener.write().discarded_many(&removed);
820
821 removed
822 }
823
824 pub fn remove_transactions_and_descendants(
827 &self,
828 hashes: Vec<TxHash>,
829 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
830 if hashes.is_empty() {
831 return Vec::new()
832 }
833 let removed = self.pool.write().remove_transactions_and_descendants(hashes);
834
835 let mut listener = self.event_listener.write();
836
837 for tx in &removed {
838 listener.discarded(tx.hash());
839 }
840
841 removed
842 }
843
844 pub fn remove_transactions_by_sender(
846 &self,
847 sender: Address,
848 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
849 let sender_id = self.get_sender_id(sender);
850 let removed = self.pool.write().remove_transactions_by_sender(sender_id);
851
852 self.event_listener.write().discarded_many(&removed);
853
854 removed
855 }
856
857 pub fn retain_unknown<A>(&self, announcement: &mut A)
859 where
860 A: HandleMempoolData,
861 {
862 if announcement.is_empty() {
863 return
864 }
865 let pool = self.get_pool_data();
866 announcement.retain_by_hash(|tx| !pool.contains(tx))
867 }
868
869 pub fn get(&self, tx_hash: &TxHash) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
871 self.get_pool_data().get(tx_hash)
872 }
873
874 pub fn get_transactions_by_sender(
876 &self,
877 sender: Address,
878 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
879 let sender_id = self.get_sender_id(sender);
880 self.get_pool_data().get_transactions_by_sender(sender_id)
881 }
882
883 pub fn get_queued_transactions_by_sender(
885 &self,
886 sender: Address,
887 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
888 let sender_id = self.get_sender_id(sender);
889 self.get_pool_data().queued_txs_by_sender(sender_id)
890 }
891
892 pub fn pending_transactions_with_predicate(
894 &self,
895 predicate: impl FnMut(&ValidPoolTransaction<T::Transaction>) -> bool,
896 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
897 self.get_pool_data().pending_transactions_with_predicate(predicate)
898 }
899
900 pub fn get_pending_transactions_by_sender(
902 &self,
903 sender: Address,
904 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
905 let sender_id = self.get_sender_id(sender);
906 self.get_pool_data().pending_txs_by_sender(sender_id)
907 }
908
909 pub fn get_highest_transaction_by_sender(
911 &self,
912 sender: Address,
913 ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
914 let sender_id = self.get_sender_id(sender);
915 self.get_pool_data().get_highest_transaction_by_sender(sender_id)
916 }
917
918 pub fn get_highest_consecutive_transaction_by_sender(
920 &self,
921 sender: Address,
922 on_chain_nonce: u64,
923 ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
924 let sender_id = self.get_sender_id(sender);
925 self.get_pool_data().get_highest_consecutive_transaction_by_sender(
926 sender_id.into_transaction_id(on_chain_nonce),
927 )
928 }
929
930 pub fn get_transaction_by_transaction_id(
932 &self,
933 transaction_id: &TransactionId,
934 ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
935 self.get_pool_data().all().get(transaction_id).map(|tx| tx.transaction.clone())
936 }
937
938 pub fn get_transactions_by_origin(
940 &self,
941 origin: TransactionOrigin,
942 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
943 self.get_pool_data()
944 .all()
945 .transactions_iter()
946 .filter(|tx| tx.origin == origin)
947 .cloned()
948 .collect()
949 }
950
951 pub fn get_pending_transactions_by_origin(
953 &self,
954 origin: TransactionOrigin,
955 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
956 self.get_pool_data().pending_transactions_iter().filter(|tx| tx.origin == origin).collect()
957 }
958
959 pub fn get_all(&self, txs: Vec<TxHash>) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
963 if txs.is_empty() {
964 return Vec::new()
965 }
966 self.get_pool_data().get_all(txs).collect()
967 }
968
969 fn get_all_propagatable(
973 &self,
974 txs: Vec<TxHash>,
975 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
976 if txs.is_empty() {
977 return Vec::new()
978 }
979 self.get_pool_data().get_all(txs).filter(|tx| tx.propagate).collect()
980 }
981
982 pub fn on_propagated(&self, txs: PropagatedTransactions) {
984 if txs.0.is_empty() {
985 return
986 }
987 let mut listener = self.event_listener.write();
988
989 if !listener.is_empty() {
990 txs.0.into_iter().for_each(|(hash, peers)| listener.propagated(&hash, peers));
991 }
992 }
993
994 pub fn len(&self) -> usize {
996 self.get_pool_data().len()
997 }
998
999 pub fn is_empty(&self) -> bool {
1001 self.get_pool_data().is_empty()
1002 }
1003
1004 pub fn is_exceeded(&self) -> bool {
1006 self.pool.read().is_exceeded()
1007 }
1008
1009 fn insert_blob(&self, hash: TxHash, blob: BlobTransactionSidecarVariant) {
1011 debug!(target: "txpool", "[{:?}] storing blob sidecar", hash);
1012 if let Err(err) = self.blob_store.insert(hash, blob) {
1013 warn!(target: "txpool", %err, "[{:?}] failed to insert blob", hash);
1014 self.blob_store_metrics.blobstore_failed_inserts.increment(1);
1015 }
1016 self.update_blob_store_metrics();
1017 }
1018
1019 pub fn delete_blob(&self, blob: TxHash) {
1021 let _ = self.blob_store.delete(blob);
1022 }
1023
1024 pub fn delete_blobs(&self, txs: Vec<TxHash>) {
1026 let _ = self.blob_store.delete_all(txs);
1027 }
1028
1029 pub fn cleanup_blobs(&self) {
1031 let stat = self.blob_store.cleanup();
1032 self.blob_store_metrics.blobstore_failed_deletes.increment(stat.delete_failed as u64);
1033 self.update_blob_store_metrics();
1034 }
1035
1036 fn update_blob_store_metrics(&self) {
1037 if let Some(data_size) = self.blob_store.data_size_hint() {
1038 self.blob_store_metrics.blobstore_byte_size.set(data_size as f64);
1039 }
1040 self.blob_store_metrics.blobstore_entries.set(self.blob_store.blobs_len() as f64);
1041 }
1042
1043 fn delete_discarded_blobs<'a>(
1045 &'a self,
1046 transactions: impl IntoIterator<Item = &'a Arc<ValidPoolTransaction<T::Transaction>>>,
1047 ) {
1048 let blob_txs = transactions
1049 .into_iter()
1050 .filter(|tx| tx.transaction.is_eip4844())
1051 .map(|tx| *tx.hash())
1052 .collect();
1053 self.delete_blobs(blob_txs);
1054 }
1055}
1056
1057impl<V, T: TransactionOrdering, S> fmt::Debug for PoolInner<V, T, S> {
1058 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1059 f.debug_struct("PoolInner").field("config", &self.config).finish_non_exhaustive()
1060 }
1061}
1062
1063#[derive(Debug, Clone)]
1065pub struct AddedPendingTransaction<T: PoolTransaction> {
1066 transaction: Arc<ValidPoolTransaction<T>>,
1068 replaced: Option<Arc<ValidPoolTransaction<T>>>,
1070 promoted: Vec<Arc<ValidPoolTransaction<T>>>,
1072 discarded: Vec<Arc<ValidPoolTransaction<T>>>,
1074}
1075
1076impl<T: PoolTransaction> AddedPendingTransaction<T> {
1077 pub(crate) fn pending_transactions(
1083 &self,
1084 kind: TransactionListenerKind,
1085 ) -> impl Iterator<Item = B256> + '_ {
1086 let iter = std::iter::once(&self.transaction).chain(self.promoted.iter());
1087 PendingTransactionIter { kind, iter }
1088 }
1089
1090 pub(crate) fn is_propagate_allowed(&self) -> bool {
1092 self.transaction.propagate
1093 }
1094}
1095
1096pub(crate) struct PendingTransactionIter<Iter> {
1097 kind: TransactionListenerKind,
1098 iter: Iter,
1099}
1100
1101impl<'a, Iter, T> Iterator for PendingTransactionIter<Iter>
1102where
1103 Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1104 T: PoolTransaction + 'a,
1105{
1106 type Item = B256;
1107
1108 fn next(&mut self) -> Option<Self::Item> {
1109 loop {
1110 let next = self.iter.next()?;
1111 if self.kind.is_propagate_only() && !next.propagate {
1112 continue
1113 }
1114 return Some(*next.hash())
1115 }
1116 }
1117}
1118
1119pub(crate) struct FullPendingTransactionIter<Iter> {
1121 kind: TransactionListenerKind,
1122 iter: Iter,
1123}
1124
1125impl<'a, Iter, T> Iterator for FullPendingTransactionIter<Iter>
1126where
1127 Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1128 T: PoolTransaction + 'a,
1129{
1130 type Item = NewTransactionEvent<T>;
1131
1132 fn next(&mut self) -> Option<Self::Item> {
1133 loop {
1134 let next = self.iter.next()?;
1135 if self.kind.is_propagate_only() && !next.propagate {
1136 continue
1137 }
1138 return Some(NewTransactionEvent {
1139 subpool: SubPool::Pending,
1140 transaction: next.clone(),
1141 })
1142 }
1143 }
1144}
1145
1146#[derive(Debug, Clone)]
1148pub enum AddedTransaction<T: PoolTransaction> {
1149 Pending(AddedPendingTransaction<T>),
1151 Parked {
1154 transaction: Arc<ValidPoolTransaction<T>>,
1156 replaced: Option<Arc<ValidPoolTransaction<T>>>,
1158 subpool: SubPool,
1160 queued_reason: Option<QueuedReason>,
1162 },
1163}
1164
1165impl<T: PoolTransaction> AddedTransaction<T> {
1166 pub(crate) const fn as_pending(&self) -> Option<&AddedPendingTransaction<T>> {
1168 match self {
1169 Self::Pending(tx) => Some(tx),
1170 _ => None,
1171 }
1172 }
1173
1174 pub(crate) const fn replaced(&self) -> Option<&Arc<ValidPoolTransaction<T>>> {
1176 match self {
1177 Self::Pending(tx) => tx.replaced.as_ref(),
1178 Self::Parked { replaced, .. } => replaced.as_ref(),
1179 }
1180 }
1181
1182 pub(crate) fn discarded_transactions(&self) -> Option<&[Arc<ValidPoolTransaction<T>>]> {
1184 match self {
1185 Self::Pending(tx) => Some(&tx.discarded),
1186 Self::Parked { .. } => None,
1187 }
1188 }
1189
1190 pub(crate) fn replaced_blob_transaction(&self) -> Option<B256> {
1192 self.replaced().filter(|tx| tx.transaction.is_eip4844()).map(|tx| *tx.transaction.hash())
1193 }
1194
1195 pub(crate) fn hash(&self) -> &TxHash {
1197 match self {
1198 Self::Pending(tx) => tx.transaction.hash(),
1199 Self::Parked { transaction, .. } => transaction.hash(),
1200 }
1201 }
1202
1203 pub(crate) fn into_new_transaction_event(self) -> NewTransactionEvent<T> {
1205 match self {
1206 Self::Pending(tx) => {
1207 NewTransactionEvent { subpool: SubPool::Pending, transaction: tx.transaction }
1208 }
1209 Self::Parked { transaction, subpool, .. } => {
1210 NewTransactionEvent { transaction, subpool }
1211 }
1212 }
1213 }
1214
1215 pub(crate) const fn subpool(&self) -> SubPool {
1217 match self {
1218 Self::Pending(_) => SubPool::Pending,
1219 Self::Parked { subpool, .. } => *subpool,
1220 }
1221 }
1222
1223 #[cfg(test)]
1225 pub(crate) fn id(&self) -> &TransactionId {
1226 match self {
1227 Self::Pending(added) => added.transaction.id(),
1228 Self::Parked { transaction, .. } => transaction.id(),
1229 }
1230 }
1231
1232 pub(crate) const fn queued_reason(&self) -> Option<&QueuedReason> {
1234 match self {
1235 Self::Pending(_) => None,
1236 Self::Parked { queued_reason, .. } => queued_reason.as_ref(),
1237 }
1238 }
1239
1240 pub(crate) fn transaction_state(&self) -> AddedTransactionState {
1242 match self.subpool() {
1243 SubPool::Pending => AddedTransactionState::Pending,
1244 _ => {
1245 if let Some(reason) = self.queued_reason() {
1248 AddedTransactionState::Queued(reason.clone())
1249 } else {
1250 AddedTransactionState::Queued(QueuedReason::NonceGap)
1252 }
1253 }
1254 }
1255 }
1256}
1257
1258#[derive(Debug, Clone, PartialEq, Eq)]
1260pub enum QueuedReason {
1261 NonceGap,
1263 ParkedAncestors,
1265 InsufficientBalance,
1267 TooMuchGas,
1269 InsufficientBaseFee,
1271 InsufficientBlobFee,
1273}
1274
1275#[derive(Debug, Clone, PartialEq, Eq)]
1277pub enum AddedTransactionState {
1278 Pending,
1280 Queued(QueuedReason),
1282}
1283
1284impl AddedTransactionState {
1285 pub const fn is_queued(&self) -> bool {
1287 matches!(self, Self::Queued(_))
1288 }
1289
1290 pub const fn is_pending(&self) -> bool {
1292 matches!(self, Self::Pending)
1293 }
1294
1295 pub const fn queued_reason(&self) -> Option<&QueuedReason> {
1297 match self {
1298 Self::Queued(reason) => Some(reason),
1299 Self::Pending => None,
1300 }
1301 }
1302}
1303
1304#[derive(Debug, Clone, PartialEq, Eq)]
1306pub struct AddedTransactionOutcome {
1307 pub hash: TxHash,
1309 pub state: AddedTransactionState,
1311}
1312
1313impl AddedTransactionOutcome {
1314 pub const fn is_queued(&self) -> bool {
1316 self.state.is_queued()
1317 }
1318
1319 pub const fn is_pending(&self) -> bool {
1321 self.state.is_pending()
1322 }
1323}
1324
1325#[derive(Debug)]
1327pub(crate) struct OnNewCanonicalStateOutcome<T: PoolTransaction> {
1328 pub(crate) block_hash: B256,
1330 pub(crate) mined: Vec<TxHash>,
1332 pub(crate) promoted: Vec<Arc<ValidPoolTransaction<T>>>,
1334 pub(crate) discarded: Vec<Arc<ValidPoolTransaction<T>>>,
1336}
1337
1338impl<T: PoolTransaction> OnNewCanonicalStateOutcome<T> {
1339 pub(crate) fn pending_transactions(
1345 &self,
1346 kind: TransactionListenerKind,
1347 ) -> impl Iterator<Item = B256> + '_ {
1348 let iter = self.promoted.iter();
1349 PendingTransactionIter { kind, iter }
1350 }
1351
1352 pub(crate) fn full_pending_transactions(
1358 &self,
1359 kind: TransactionListenerKind,
1360 ) -> impl Iterator<Item = NewTransactionEvent<T>> + '_ {
1361 let iter = self.promoted.iter();
1362 FullPendingTransactionIter { kind, iter }
1363 }
1364}
1365
1366#[cfg(test)]
1367mod tests {
1368 use crate::{
1369 blobstore::{BlobStore, InMemoryBlobStore},
1370 identifier::SenderId,
1371 test_utils::{MockTransaction, TestPoolBuilder},
1372 validate::ValidTransaction,
1373 BlockInfo, PoolConfig, SubPoolLimit, TransactionOrigin, TransactionValidationOutcome, U256,
1374 };
1375 use alloy_eips::{eip4844::BlobTransactionSidecar, eip7594::BlobTransactionSidecarVariant};
1376 use alloy_primitives::Address;
1377 use std::{fs, path::PathBuf};
1378
1379 #[test]
1380 fn test_discard_blobs_on_blob_tx_eviction() {
1381 let blobs = {
1382 let json_content = fs::read_to_string(
1384 PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test_data/blob1.json"),
1385 )
1386 .expect("Failed to read the blob data file");
1387
1388 let json_value: serde_json::Value =
1390 serde_json::from_str(&json_content).expect("Failed to deserialize JSON");
1391
1392 vec![
1394 json_value
1396 .get("data")
1397 .unwrap()
1398 .as_str()
1399 .expect("Data is not a valid string")
1400 .to_string(),
1401 ]
1402 };
1403
1404 let sidecar = BlobTransactionSidecarVariant::Eip4844(
1406 BlobTransactionSidecar::try_from_blobs_hex(blobs).unwrap(),
1407 );
1408
1409 let blob_limit = SubPoolLimit::new(1000, usize::MAX);
1411
1412 let test_pool = &TestPoolBuilder::default()
1414 .with_config(PoolConfig { blob_limit, ..Default::default() })
1415 .pool;
1416
1417 test_pool
1419 .set_block_info(BlockInfo { pending_blob_fee: Some(10_000_000), ..Default::default() });
1420
1421 let blob_store = InMemoryBlobStore::default();
1423
1424 for n in 0..blob_limit.max_txs + 10 {
1426 let mut tx = MockTransaction::eip4844_with_sidecar(sidecar.clone());
1428
1429 tx.set_size(1844674407370951);
1431
1432 if n < blob_limit.max_txs {
1434 blob_store.insert(*tx.get_hash(), sidecar.clone()).unwrap();
1435 }
1436
1437 test_pool.add_transactions(
1439 TransactionOrigin::External,
1440 [TransactionValidationOutcome::Valid {
1441 balance: U256::from(1_000),
1442 state_nonce: 0,
1443 bytecode_hash: None,
1444 transaction: ValidTransaction::ValidWithSidecar {
1445 transaction: tx,
1446 sidecar: sidecar.clone(),
1447 },
1448 propagate: true,
1449 authorities: None,
1450 }],
1451 );
1452 }
1453
1454 assert_eq!(test_pool.size().blob, blob_limit.max_txs);
1456
1457 assert_eq!(test_pool.size().blob_size, 1844674407370951000);
1459
1460 assert_eq!(*test_pool.blob_store(), blob_store);
1462 }
1463
1464 #[test]
1465 fn test_auths_stored_in_identifiers() {
1466 let test_pool = &TestPoolBuilder::default().with_config(Default::default()).pool;
1468
1469 let auth = Address::new([1; 20]);
1470 let tx = MockTransaction::eip7702();
1471
1472 test_pool.add_transactions(
1473 TransactionOrigin::Local,
1474 [TransactionValidationOutcome::Valid {
1475 balance: U256::from(1_000),
1476 state_nonce: 0,
1477 bytecode_hash: None,
1478 transaction: ValidTransaction::Valid(tx),
1479 propagate: true,
1480 authorities: Some(vec![auth]),
1481 }],
1482 );
1483
1484 let identifiers = test_pool.identifiers.read();
1485 assert_eq!(identifiers.sender_id(&auth), Some(SenderId::from(1)));
1486 }
1487}