1use crate::{
69 blobstore::BlobStore,
70 error::{PoolError, PoolErrorKind, PoolResult},
71 identifier::{SenderId, SenderIdentifiers, TransactionId},
72 metrics::BlobStoreMetrics,
73 pool::{
74 listener::{
75 BlobTransactionSidecarListener, PendingTransactionHashListener, PoolEventBroadcast,
76 TransactionListener,
77 },
78 state::SubPool,
79 txpool::{SenderInfo, TxPool},
80 update::UpdateOutcome,
81 },
82 traits::{
83 AllPoolTransactions, BestTransactionsAttributes, BlockInfo, GetPooledTransactionLimit,
84 NewBlobSidecar, PoolSize, PoolTransaction, PropagatedTransactions, TransactionOrigin,
85 },
86 validate::{TransactionValidationOutcome, ValidPoolTransaction, ValidTransaction},
87 CanonicalStateUpdate, EthPoolTransaction, PoolConfig, TransactionOrdering,
88 TransactionValidator,
89};
90
91use alloy_primitives::{Address, TxHash, B256};
92use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
93use reth_eth_wire_types::HandleMempoolData;
94use reth_execution_types::ChangedAccount;
95
96use alloy_eips::{eip7594::BlobTransactionSidecarVariant, Typed2718};
97use reth_primitives_traits::Recovered;
98use rustc_hash::FxHashMap;
99use std::{collections::HashSet, fmt, sync::Arc, time::Instant};
100use tokio::sync::mpsc;
101use tracing::{debug, trace, warn};
102mod events;
103pub use best::{BestTransactionFilter, BestTransactionsWithPrioritizedSenders};
104pub use blob::{blob_tx_priority, fee_delta, BlobOrd, BlobTransactions};
105pub use events::{FullTransactionEvent, NewTransactionEvent, TransactionEvent};
106pub use listener::{AllTransactionsEvents, TransactionEvents, TransactionListenerKind};
107pub use parked::{BasefeeOrd, ParkedOrd, ParkedPool, QueuedOrd};
108pub use pending::PendingPool;
109use reth_primitives_traits::Block;
110
111mod best;
112pub use best::BestTransactions;
113
114mod blob;
115pub mod listener;
116mod parked;
117pub mod pending;
118pub mod size;
119pub(crate) mod state;
120pub mod txpool;
121mod update;
122
123pub const PENDING_TX_LISTENER_BUFFER_SIZE: usize = 2048;
125pub const NEW_TX_LISTENER_BUFFER_SIZE: usize = 1024;
127
128const BLOB_SIDECAR_LISTENER_BUFFER_SIZE: usize = 512;
129
130pub struct PoolInner<V, T, S>
132where
133 T: TransactionOrdering,
134{
135 identifiers: RwLock<SenderIdentifiers>,
137 validator: V,
139 blob_store: S,
141 pool: RwLock<TxPool<T>>,
143 config: PoolConfig,
145 event_listener: RwLock<PoolEventBroadcast<T::Transaction>>,
147 pending_transaction_listener: Mutex<Vec<PendingTransactionHashListener>>,
149 transaction_listener: Mutex<Vec<TransactionListener<T::Transaction>>>,
151 blob_transaction_sidecar_listener: Mutex<Vec<BlobTransactionSidecarListener>>,
153 blob_store_metrics: BlobStoreMetrics,
155}
156
157impl<V, T, S> PoolInner<V, T, S>
160where
161 V: TransactionValidator,
162 T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
163 S: BlobStore,
164{
165 pub fn new(validator: V, ordering: T, blob_store: S, config: PoolConfig) -> Self {
167 Self {
168 identifiers: Default::default(),
169 validator,
170 event_listener: Default::default(),
171 pool: RwLock::new(TxPool::new(ordering, config.clone())),
172 pending_transaction_listener: Default::default(),
173 transaction_listener: Default::default(),
174 blob_transaction_sidecar_listener: Default::default(),
175 config,
176 blob_store,
177 blob_store_metrics: Default::default(),
178 }
179 }
180
181 pub const fn blob_store(&self) -> &S {
183 &self.blob_store
184 }
185
186 pub fn size(&self) -> PoolSize {
188 self.get_pool_data().size()
189 }
190
191 pub fn block_info(&self) -> BlockInfo {
193 self.get_pool_data().block_info()
194 }
195 pub fn set_block_info(&self, info: BlockInfo) {
197 self.pool.write().set_block_info(info)
198 }
199
200 pub fn get_sender_id(&self, addr: Address) -> SenderId {
202 self.identifiers.write().sender_id_or_create(addr)
203 }
204
205 pub fn get_sender_ids(&self, addrs: impl IntoIterator<Item = Address>) -> Vec<SenderId> {
207 self.identifiers.write().sender_ids_or_create(addrs)
208 }
209
210 pub fn unique_senders(&self) -> HashSet<Address> {
212 self.get_pool_data().unique_senders()
213 }
214
215 fn changed_senders(
218 &self,
219 accs: impl Iterator<Item = ChangedAccount>,
220 ) -> FxHashMap<SenderId, SenderInfo> {
221 let identifiers = self.identifiers.read();
222 accs.into_iter()
223 .filter_map(|acc| {
224 let ChangedAccount { address, nonce, balance } = acc;
225 let sender_id = identifiers.sender_id(&address)?;
226 Some((sender_id, SenderInfo { state_nonce: nonce, balance }))
227 })
228 .collect()
229 }
230
231 pub const fn config(&self) -> &PoolConfig {
233 &self.config
234 }
235
236 pub const fn validator(&self) -> &V {
238 &self.validator
239 }
240
241 pub fn add_pending_listener(&self, kind: TransactionListenerKind) -> mpsc::Receiver<TxHash> {
244 let (sender, rx) = mpsc::channel(self.config.pending_tx_listener_buffer_size);
245 let listener = PendingTransactionHashListener { sender, kind };
246 self.pending_transaction_listener.lock().push(listener);
247 rx
248 }
249
250 pub fn add_new_transaction_listener(
252 &self,
253 kind: TransactionListenerKind,
254 ) -> mpsc::Receiver<NewTransactionEvent<T::Transaction>> {
255 let (sender, rx) = mpsc::channel(self.config.new_tx_listener_buffer_size);
256 let listener = TransactionListener { sender, kind };
257 self.transaction_listener.lock().push(listener);
258 rx
259 }
260 pub fn add_blob_sidecar_listener(&self) -> mpsc::Receiver<NewBlobSidecar> {
263 let (sender, rx) = mpsc::channel(BLOB_SIDECAR_LISTENER_BUFFER_SIZE);
264 let listener = BlobTransactionSidecarListener { sender };
265 self.blob_transaction_sidecar_listener.lock().push(listener);
266 rx
267 }
268
269 pub fn add_transaction_event_listener(&self, tx_hash: TxHash) -> Option<TransactionEvents> {
272 self.get_pool_data()
273 .contains(&tx_hash)
274 .then(|| self.event_listener.write().subscribe(tx_hash))
275 }
276
277 pub fn add_all_transactions_event_listener(&self) -> AllTransactionsEvents<T::Transaction> {
279 self.event_listener.write().subscribe_all()
280 }
281
282 pub fn get_pool_data(&self) -> RwLockReadGuard<'_, TxPool<T>> {
284 self.pool.read()
285 }
286
287 pub fn pooled_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
289 let mut out = Vec::new();
290 self.append_pooled_transactions(&mut out);
291 out
292 }
293
294 pub fn pooled_transactions_hashes(&self) -> Vec<TxHash> {
296 let mut out = Vec::new();
297 self.append_pooled_transactions_hashes(&mut out);
298 out
299 }
300
301 pub fn pooled_transactions_max(
303 &self,
304 max: usize,
305 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
306 let mut out = Vec::new();
307 self.append_pooled_transactions_max(max, &mut out);
308 out
309 }
310
311 pub fn append_pooled_transactions(
313 &self,
314 out: &mut Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
315 ) {
316 out.extend(
317 self.get_pool_data().all().transactions_iter().filter(|tx| tx.propagate).cloned(),
318 );
319 }
320
321 pub fn append_pooled_transactions_hashes(&self, out: &mut Vec<TxHash>) {
324 out.extend(
325 self.get_pool_data()
326 .all()
327 .transactions_iter()
328 .filter(|tx| tx.propagate)
329 .map(|tx| *tx.hash()),
330 );
331 }
332
333 pub fn append_pooled_transactions_max(
336 &self,
337 max: usize,
338 out: &mut Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
339 ) {
340 out.extend(
341 self.get_pool_data()
342 .all()
343 .transactions_iter()
344 .filter(|tx| tx.propagate)
345 .take(max)
346 .cloned(),
347 );
348 }
349
350 pub fn pooled_transactions_hashes_max(&self, max: usize) -> Vec<TxHash> {
352 if max == 0 {
353 return Vec::new();
354 }
355 self.get_pool_data()
356 .all()
357 .transactions_iter()
358 .filter(|tx| tx.propagate)
359 .take(max)
360 .map(|tx| *tx.hash())
361 .collect()
362 }
363
364 fn to_pooled_transaction(
369 &self,
370 transaction: Arc<ValidPoolTransaction<T::Transaction>>,
371 ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
372 where
373 <V as TransactionValidator>::Transaction: EthPoolTransaction,
374 {
375 if transaction.is_eip4844() {
376 let sidecar = self.blob_store.get(*transaction.hash()).ok()??;
377 transaction.transaction.clone().try_into_pooled_eip4844(sidecar)
378 } else {
379 transaction
380 .transaction
381 .clone_into_pooled()
382 .inspect_err(|err| {
383 debug!(
384 target: "txpool", %err,
385 "failed to convert transaction to pooled element; skipping",
386 );
387 })
388 .ok()
389 }
390 }
391
392 pub fn get_pooled_transaction_elements(
395 &self,
396 tx_hashes: Vec<TxHash>,
397 limit: GetPooledTransactionLimit,
398 ) -> Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>
399 where
400 <V as TransactionValidator>::Transaction: EthPoolTransaction,
401 {
402 let transactions = self.get_all_propagatable(tx_hashes);
403 let mut elements = Vec::with_capacity(transactions.len());
404 let mut size = 0;
405 for transaction in transactions {
406 let encoded_len = transaction.encoded_length();
407 let Some(pooled) = self.to_pooled_transaction(transaction) else {
408 continue;
409 };
410
411 size += encoded_len;
412 elements.push(pooled.into_inner());
413
414 if limit.exceeds(size) {
415 break
416 }
417 }
418
419 elements
420 }
421
422 pub fn get_pooled_transaction_element(
424 &self,
425 tx_hash: TxHash,
426 ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
427 where
428 <V as TransactionValidator>::Transaction: EthPoolTransaction,
429 {
430 self.get(&tx_hash).and_then(|tx| self.to_pooled_transaction(tx))
431 }
432
433 pub fn on_canonical_state_change<B>(&self, update: CanonicalStateUpdate<'_, B>)
435 where
436 B: Block,
437 {
438 trace!(target: "txpool", ?update, "updating pool on canonical state change");
439
440 let block_info = update.block_info();
441 let CanonicalStateUpdate {
442 new_tip, changed_accounts, mined_transactions, update_kind, ..
443 } = update;
444 self.validator.on_new_head_block(new_tip);
445
446 let changed_senders = self.changed_senders(changed_accounts.into_iter());
447
448 let outcome = self.pool.write().on_canonical_state_change(
450 block_info,
451 mined_transactions,
452 changed_senders,
453 update_kind,
454 );
455
456 self.delete_discarded_blobs(outcome.discarded.iter());
458
459 self.notify_on_new_state(outcome);
461 }
462
463 pub fn update_accounts(&self, accounts: Vec<ChangedAccount>) {
469 let changed_senders = self.changed_senders(accounts.into_iter());
470 let UpdateOutcome { promoted, discarded } =
471 self.pool.write().update_accounts(changed_senders);
472
473 self.notify_on_transaction_updates(promoted, discarded);
474 }
475
476 fn add_transaction(
481 &self,
482 pool: &mut RwLockWriteGuard<'_, TxPool<T>>,
483 origin: TransactionOrigin,
484 tx: TransactionValidationOutcome<T::Transaction>,
485 ) -> PoolResult<AddedTransactionOutcome> {
486 match tx {
487 TransactionValidationOutcome::Valid {
488 balance,
489 state_nonce,
490 transaction,
491 propagate,
492 bytecode_hash,
493 authorities,
494 } => {
495 let sender_id = self.get_sender_id(transaction.sender());
496 let transaction_id = TransactionId::new(sender_id, transaction.nonce());
497
498 let (transaction, maybe_sidecar) = match transaction {
500 ValidTransaction::Valid(tx) => (tx, None),
501 ValidTransaction::ValidWithSidecar { transaction, sidecar } => {
502 debug_assert!(
503 transaction.is_eip4844(),
504 "validator returned sidecar for non EIP-4844 transaction"
505 );
506 (transaction, Some(sidecar))
507 }
508 };
509
510 let tx = ValidPoolTransaction {
511 transaction,
512 transaction_id,
513 propagate,
514 timestamp: Instant::now(),
515 origin,
516 authority_ids: authorities.map(|auths| self.get_sender_ids(auths)),
517 };
518
519 let added = pool.add_transaction(tx, balance, state_nonce, bytecode_hash)?;
520 let hash = *added.hash();
521 let state = added.transaction_state();
522
523 if let Some(sidecar) = maybe_sidecar {
525 self.on_new_blob_sidecar(&hash, &sidecar);
527 self.insert_blob(hash, sidecar);
529 }
530
531 if let Some(replaced) = added.replaced_blob_transaction() {
532 debug!(target: "txpool", "[{:?}] delete replaced blob sidecar", replaced);
533 self.delete_blob(replaced);
535 }
536
537 if let Some(pending) = added.as_pending() {
539 self.on_new_pending_transaction(pending);
540 }
541
542 self.notify_event_listeners(&added);
544
545 if let Some(discarded) = added.discarded_transactions() {
546 self.delete_discarded_blobs(discarded.iter());
547 }
548
549 self.on_new_transaction(added.into_new_transaction_event());
551
552 Ok(AddedTransactionOutcome { hash, state })
553 }
554 TransactionValidationOutcome::Invalid(tx, err) => {
555 let mut listener = self.event_listener.write();
556 listener.invalid(tx.hash());
557 Err(PoolError::new(*tx.hash(), err))
558 }
559 TransactionValidationOutcome::Error(tx_hash, err) => {
560 let mut listener = self.event_listener.write();
561 listener.discarded(&tx_hash);
562 Err(PoolError::other(tx_hash, err))
563 }
564 }
565 }
566
567 pub fn add_transaction_and_subscribe(
569 &self,
570 origin: TransactionOrigin,
571 tx: TransactionValidationOutcome<T::Transaction>,
572 ) -> PoolResult<TransactionEvents> {
573 let listener = {
574 let mut listener = self.event_listener.write();
575 listener.subscribe(tx.tx_hash())
576 };
577 let mut results = self.add_transactions(origin, std::iter::once(tx));
578 results.pop().expect("result length is the same as the input")?;
579 Ok(listener)
580 }
581
582 pub fn add_transactions(
588 &self,
589 origin: TransactionOrigin,
590 transactions: impl IntoIterator<Item = TransactionValidationOutcome<T::Transaction>>,
591 ) -> Vec<PoolResult<AddedTransactionOutcome>> {
592 let (mut added, discarded) = {
594 let mut pool = self.pool.write();
595 let added = transactions
596 .into_iter()
597 .map(|tx| self.add_transaction(&mut pool, origin, tx))
598 .collect::<Vec<_>>();
599
600 let discarded = if added.iter().any(Result::is_ok) {
602 pool.discard_worst()
603 } else {
604 Default::default()
605 };
606
607 (added, discarded)
608 };
609
610 if !discarded.is_empty() {
611 self.delete_discarded_blobs(discarded.iter());
613 self.event_listener.write().discarded_many(&discarded);
614
615 let discarded_hashes =
616 discarded.into_iter().map(|tx| *tx.hash()).collect::<HashSet<_>>();
617
618 for res in &mut added {
621 if let Ok(AddedTransactionOutcome { hash, .. }) = res &&
622 discarded_hashes.contains(hash)
623 {
624 *res = Err(PoolError::new(*hash, PoolErrorKind::DiscardedOnInsert))
625 }
626 }
627 };
628
629 added
630 }
631
632 pub fn on_new_pending_transaction(&self, pending: &AddedPendingTransaction<T::Transaction>) {
641 let mut transaction_listeners = self.pending_transaction_listener.lock();
642 transaction_listeners.retain_mut(|listener| {
643 listener.send_all(pending.pending_transactions(listener.kind))
645 });
646 }
647
648 pub fn on_new_transaction(&self, event: NewTransactionEvent<T::Transaction>) {
657 let mut transaction_listeners = self.transaction_listener.lock();
658 transaction_listeners.retain_mut(|listener| {
659 if listener.kind.is_propagate_only() && !event.transaction.propagate {
660 return !listener.sender.is_closed()
663 }
664
665 listener.send(event.clone())
666 });
667 }
668
669 fn on_new_blob_sidecar(&self, tx_hash: &TxHash, sidecar: &BlobTransactionSidecarVariant) {
671 let mut sidecar_listeners = self.blob_transaction_sidecar_listener.lock();
672 if sidecar_listeners.is_empty() {
673 return
674 }
675 let sidecar = Arc::new(sidecar.clone());
676 sidecar_listeners.retain_mut(|listener| {
677 let new_blob_event = NewBlobSidecar { tx_hash: *tx_hash, sidecar: sidecar.clone() };
678 match listener.sender.try_send(new_blob_event) {
679 Ok(()) => true,
680 Err(err) => {
681 if matches!(err, mpsc::error::TrySendError::Full(_)) {
682 debug!(
683 target: "txpool",
684 "[{:?}] failed to send blob sidecar; channel full",
685 sidecar,
686 );
687 true
688 } else {
689 false
690 }
691 }
692 }
693 })
694 }
695
696 fn notify_on_new_state(&self, outcome: OnNewCanonicalStateOutcome<T::Transaction>) {
698 trace!(target: "txpool", promoted=outcome.promoted.len(), discarded= outcome.discarded.len() ,"notifying listeners on state change");
699
700 self.pending_transaction_listener
703 .lock()
704 .retain_mut(|listener| listener.send_all(outcome.pending_transactions(listener.kind)));
705
706 self.transaction_listener.lock().retain_mut(|listener| {
708 listener.send_all(outcome.full_pending_transactions(listener.kind))
709 });
710
711 let OnNewCanonicalStateOutcome { mined, promoted, discarded, block_hash } = outcome;
712
713 let mut listener = self.event_listener.write();
715
716 if !listener.is_empty() {
717 for tx in &mined {
718 listener.mined(tx, block_hash);
719 }
720 for tx in &promoted {
721 listener.pending(tx.hash(), None);
722 }
723 for tx in &discarded {
724 listener.discarded(tx.hash());
725 }
726 }
727 }
728
729 #[allow(clippy::type_complexity)]
738 pub fn notify_on_transaction_updates(
739 &self,
740 promoted: Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
741 discarded: Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
742 ) {
743 if !promoted.is_empty() {
745 self.pending_transaction_listener.lock().retain_mut(|listener| {
746 let promoted_hashes = promoted.iter().filter_map(|tx| {
747 if listener.kind.is_propagate_only() && !tx.propagate {
748 None
749 } else {
750 Some(*tx.hash())
751 }
752 });
753 listener.send_all(promoted_hashes)
754 });
755
756 self.transaction_listener.lock().retain_mut(|listener| {
758 let promoted_txs = promoted.iter().filter_map(|tx| {
759 if listener.kind.is_propagate_only() && !tx.propagate {
760 None
761 } else {
762 Some(NewTransactionEvent::pending(tx.clone()))
763 }
764 });
765 listener.send_all(promoted_txs)
766 });
767 }
768
769 {
770 let mut listener = self.event_listener.write();
771 if !listener.is_empty() {
772 for tx in &promoted {
773 listener.pending(tx.hash(), None);
774 }
775 for tx in &discarded {
776 listener.discarded(tx.hash());
777 }
778 }
779 }
780
781 if !discarded.is_empty() {
782 self.delete_discarded_blobs(discarded.iter());
785 }
786 }
787
788 pub fn notify_event_listeners(&self, tx: &AddedTransaction<T::Transaction>) {
797 let mut listener = self.event_listener.write();
798 if listener.is_empty() {
799 return
801 }
802
803 match tx {
804 AddedTransaction::Pending(tx) => {
805 let AddedPendingTransaction { transaction, promoted, discarded, replaced } = tx;
806
807 listener.pending(transaction.hash(), replaced.clone());
808 for tx in promoted {
809 listener.pending(tx.hash(), None);
810 }
811 for tx in discarded {
812 listener.discarded(tx.hash());
813 }
814 }
815 AddedTransaction::Parked { transaction, replaced, queued_reason, .. } => {
816 listener.queued(transaction.hash(), queued_reason.clone());
817 if let Some(replaced) = replaced {
818 listener.replaced(replaced.clone(), *transaction.hash());
819 }
820 }
821 }
822 }
823
824 pub fn best_transactions(&self) -> BestTransactions<T> {
826 self.get_pool_data().best_transactions()
827 }
828
829 pub fn best_transactions_with_attributes(
832 &self,
833 best_transactions_attributes: BestTransactionsAttributes,
834 ) -> Box<dyn crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T::Transaction>>>>
835 {
836 self.get_pool_data().best_transactions_with_attributes(best_transactions_attributes)
837 }
838
839 pub fn pending_transactions_max(
841 &self,
842 max: usize,
843 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
844 self.get_pool_data().pending_transactions_iter().take(max).collect()
845 }
846
847 pub fn pending_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
849 self.get_pool_data().pending_transactions()
850 }
851
852 pub fn queued_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
854 self.get_pool_data().queued_transactions()
855 }
856
857 pub fn all_transactions(&self) -> AllPoolTransactions<T::Transaction> {
859 let pool = self.get_pool_data();
860 AllPoolTransactions {
861 pending: pool.pending_transactions(),
862 queued: pool.queued_transactions(),
863 }
864 }
865
866 pub fn all_transaction_hashes(&self) -> Vec<TxHash> {
868 self.get_pool_data().all().transactions_iter().map(|tx| *tx.hash()).collect()
869 }
870
871 pub fn remove_transactions(
876 &self,
877 hashes: Vec<TxHash>,
878 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
879 if hashes.is_empty() {
880 return Vec::new()
881 }
882 let removed = self.pool.write().remove_transactions(hashes);
883
884 self.event_listener.write().discarded_many(&removed);
885
886 removed
887 }
888
889 pub fn remove_transactions_and_descendants(
892 &self,
893 hashes: Vec<TxHash>,
894 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
895 if hashes.is_empty() {
896 return Vec::new()
897 }
898 let removed = self.pool.write().remove_transactions_and_descendants(hashes);
899
900 let mut listener = self.event_listener.write();
901
902 for tx in &removed {
903 listener.discarded(tx.hash());
904 }
905
906 removed
907 }
908
909 pub fn remove_transactions_by_sender(
911 &self,
912 sender: Address,
913 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
914 let sender_id = self.get_sender_id(sender);
915 let removed = self.pool.write().remove_transactions_by_sender(sender_id);
916
917 self.event_listener.write().discarded_many(&removed);
918
919 removed
920 }
921
922 pub fn retain_unknown<A>(&self, announcement: &mut A)
924 where
925 A: HandleMempoolData,
926 {
927 if announcement.is_empty() {
928 return
929 }
930 let pool = self.get_pool_data();
931 announcement.retain_by_hash(|tx| !pool.contains(tx))
932 }
933
934 pub fn get(&self, tx_hash: &TxHash) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
936 self.get_pool_data().get(tx_hash)
937 }
938
939 pub fn get_transactions_by_sender(
941 &self,
942 sender: Address,
943 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
944 let sender_id = self.get_sender_id(sender);
945 self.get_pool_data().get_transactions_by_sender(sender_id)
946 }
947
948 pub fn get_queued_transactions_by_sender(
950 &self,
951 sender: Address,
952 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
953 let sender_id = self.get_sender_id(sender);
954 self.get_pool_data().queued_txs_by_sender(sender_id)
955 }
956
957 pub fn pending_transactions_with_predicate(
959 &self,
960 predicate: impl FnMut(&ValidPoolTransaction<T::Transaction>) -> bool,
961 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
962 self.get_pool_data().pending_transactions_with_predicate(predicate)
963 }
964
965 pub fn get_pending_transactions_by_sender(
967 &self,
968 sender: Address,
969 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
970 let sender_id = self.get_sender_id(sender);
971 self.get_pool_data().pending_txs_by_sender(sender_id)
972 }
973
974 pub fn get_highest_transaction_by_sender(
976 &self,
977 sender: Address,
978 ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
979 let sender_id = self.get_sender_id(sender);
980 self.get_pool_data().get_highest_transaction_by_sender(sender_id)
981 }
982
983 pub fn get_highest_consecutive_transaction_by_sender(
985 &self,
986 sender: Address,
987 on_chain_nonce: u64,
988 ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
989 let sender_id = self.get_sender_id(sender);
990 self.get_pool_data().get_highest_consecutive_transaction_by_sender(
991 sender_id.into_transaction_id(on_chain_nonce),
992 )
993 }
994
995 pub fn get_transaction_by_transaction_id(
997 &self,
998 transaction_id: &TransactionId,
999 ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1000 self.get_pool_data().all().get(transaction_id).map(|tx| tx.transaction.clone())
1001 }
1002
1003 pub fn get_transactions_by_origin(
1005 &self,
1006 origin: TransactionOrigin,
1007 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1008 self.get_pool_data()
1009 .all()
1010 .transactions_iter()
1011 .filter(|tx| tx.origin == origin)
1012 .cloned()
1013 .collect()
1014 }
1015
1016 pub fn get_pending_transactions_by_origin(
1018 &self,
1019 origin: TransactionOrigin,
1020 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1021 self.get_pool_data().pending_transactions_iter().filter(|tx| tx.origin == origin).collect()
1022 }
1023
1024 pub fn get_all(&self, txs: Vec<TxHash>) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1028 if txs.is_empty() {
1029 return Vec::new()
1030 }
1031 self.get_pool_data().get_all(txs).collect()
1032 }
1033
1034 fn get_all_propagatable(
1038 &self,
1039 txs: Vec<TxHash>,
1040 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1041 if txs.is_empty() {
1042 return Vec::new()
1043 }
1044 self.get_pool_data().get_all(txs).filter(|tx| tx.propagate).collect()
1045 }
1046
1047 pub fn on_propagated(&self, txs: PropagatedTransactions) {
1049 if txs.0.is_empty() {
1050 return
1051 }
1052 let mut listener = self.event_listener.write();
1053
1054 if !listener.is_empty() {
1055 txs.0.into_iter().for_each(|(hash, peers)| listener.propagated(&hash, peers));
1056 }
1057 }
1058
1059 pub fn len(&self) -> usize {
1061 self.get_pool_data().len()
1062 }
1063
1064 pub fn is_empty(&self) -> bool {
1066 self.get_pool_data().is_empty()
1067 }
1068
1069 pub fn is_exceeded(&self) -> bool {
1071 self.pool.read().is_exceeded()
1072 }
1073
1074 fn insert_blob(&self, hash: TxHash, blob: BlobTransactionSidecarVariant) {
1076 debug!(target: "txpool", "[{:?}] storing blob sidecar", hash);
1077 if let Err(err) = self.blob_store.insert(hash, blob) {
1078 warn!(target: "txpool", %err, "[{:?}] failed to insert blob", hash);
1079 self.blob_store_metrics.blobstore_failed_inserts.increment(1);
1080 }
1081 self.update_blob_store_metrics();
1082 }
1083
1084 pub fn delete_blob(&self, blob: TxHash) {
1086 let _ = self.blob_store.delete(blob);
1087 }
1088
1089 pub fn delete_blobs(&self, txs: Vec<TxHash>) {
1091 let _ = self.blob_store.delete_all(txs);
1092 }
1093
1094 pub fn cleanup_blobs(&self) {
1096 let stat = self.blob_store.cleanup();
1097 self.blob_store_metrics.blobstore_failed_deletes.increment(stat.delete_failed as u64);
1098 self.update_blob_store_metrics();
1099 }
1100
1101 fn update_blob_store_metrics(&self) {
1102 if let Some(data_size) = self.blob_store.data_size_hint() {
1103 self.blob_store_metrics.blobstore_byte_size.set(data_size as f64);
1104 }
1105 self.blob_store_metrics.blobstore_entries.set(self.blob_store.blobs_len() as f64);
1106 }
1107
1108 fn delete_discarded_blobs<'a>(
1110 &'a self,
1111 transactions: impl IntoIterator<Item = &'a Arc<ValidPoolTransaction<T::Transaction>>>,
1112 ) {
1113 let blob_txs = transactions
1114 .into_iter()
1115 .filter(|tx| tx.transaction.is_eip4844())
1116 .map(|tx| *tx.hash())
1117 .collect();
1118 self.delete_blobs(blob_txs);
1119 }
1120}
1121
1122impl<V, T: TransactionOrdering, S> fmt::Debug for PoolInner<V, T, S> {
1123 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1124 f.debug_struct("PoolInner").field("config", &self.config).finish_non_exhaustive()
1125 }
1126}
1127
1128#[derive(Debug, Clone)]
1130pub struct AddedPendingTransaction<T: PoolTransaction> {
1131 pub transaction: Arc<ValidPoolTransaction<T>>,
1133 pub replaced: Option<Arc<ValidPoolTransaction<T>>>,
1135 pub promoted: Vec<Arc<ValidPoolTransaction<T>>>,
1137 pub discarded: Vec<Arc<ValidPoolTransaction<T>>>,
1139}
1140
1141impl<T: PoolTransaction> AddedPendingTransaction<T> {
1142 pub(crate) fn pending_transactions(
1148 &self,
1149 kind: TransactionListenerKind,
1150 ) -> impl Iterator<Item = B256> + '_ {
1151 let iter = std::iter::once(&self.transaction).chain(self.promoted.iter());
1152 PendingTransactionIter { kind, iter }
1153 }
1154}
1155
1156pub(crate) struct PendingTransactionIter<Iter> {
1157 kind: TransactionListenerKind,
1158 iter: Iter,
1159}
1160
1161impl<'a, Iter, T> Iterator for PendingTransactionIter<Iter>
1162where
1163 Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1164 T: PoolTransaction + 'a,
1165{
1166 type Item = B256;
1167
1168 fn next(&mut self) -> Option<Self::Item> {
1169 loop {
1170 let next = self.iter.next()?;
1171 if self.kind.is_propagate_only() && !next.propagate {
1172 continue
1173 }
1174 return Some(*next.hash())
1175 }
1176 }
1177}
1178
1179pub(crate) struct FullPendingTransactionIter<Iter> {
1181 kind: TransactionListenerKind,
1182 iter: Iter,
1183}
1184
1185impl<'a, Iter, T> Iterator for FullPendingTransactionIter<Iter>
1186where
1187 Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1188 T: PoolTransaction + 'a,
1189{
1190 type Item = NewTransactionEvent<T>;
1191
1192 fn next(&mut self) -> Option<Self::Item> {
1193 loop {
1194 let next = self.iter.next()?;
1195 if self.kind.is_propagate_only() && !next.propagate {
1196 continue
1197 }
1198 return Some(NewTransactionEvent {
1199 subpool: SubPool::Pending,
1200 transaction: next.clone(),
1201 })
1202 }
1203 }
1204}
1205
1206#[derive(Debug, Clone)]
1208pub enum AddedTransaction<T: PoolTransaction> {
1209 Pending(AddedPendingTransaction<T>),
1211 Parked {
1214 transaction: Arc<ValidPoolTransaction<T>>,
1216 replaced: Option<Arc<ValidPoolTransaction<T>>>,
1218 subpool: SubPool,
1220 queued_reason: Option<QueuedReason>,
1222 },
1223}
1224
1225impl<T: PoolTransaction> AddedTransaction<T> {
1226 pub const fn as_pending(&self) -> Option<&AddedPendingTransaction<T>> {
1228 match self {
1229 Self::Pending(tx) => Some(tx),
1230 _ => None,
1231 }
1232 }
1233
1234 pub const fn replaced(&self) -> Option<&Arc<ValidPoolTransaction<T>>> {
1236 match self {
1237 Self::Pending(tx) => tx.replaced.as_ref(),
1238 Self::Parked { replaced, .. } => replaced.as_ref(),
1239 }
1240 }
1241
1242 pub(crate) fn discarded_transactions(&self) -> Option<&[Arc<ValidPoolTransaction<T>>]> {
1244 match self {
1245 Self::Pending(tx) => Some(&tx.discarded),
1246 Self::Parked { .. } => None,
1247 }
1248 }
1249
1250 pub(crate) fn replaced_blob_transaction(&self) -> Option<B256> {
1252 self.replaced().filter(|tx| tx.transaction.is_eip4844()).map(|tx| *tx.transaction.hash())
1253 }
1254
1255 pub fn hash(&self) -> &TxHash {
1257 match self {
1258 Self::Pending(tx) => tx.transaction.hash(),
1259 Self::Parked { transaction, .. } => transaction.hash(),
1260 }
1261 }
1262
1263 pub fn into_new_transaction_event(self) -> NewTransactionEvent<T> {
1265 match self {
1266 Self::Pending(tx) => {
1267 NewTransactionEvent { subpool: SubPool::Pending, transaction: tx.transaction }
1268 }
1269 Self::Parked { transaction, subpool, .. } => {
1270 NewTransactionEvent { transaction, subpool }
1271 }
1272 }
1273 }
1274
1275 pub(crate) const fn subpool(&self) -> SubPool {
1277 match self {
1278 Self::Pending(_) => SubPool::Pending,
1279 Self::Parked { subpool, .. } => *subpool,
1280 }
1281 }
1282
1283 #[cfg(test)]
1285 pub(crate) fn id(&self) -> &TransactionId {
1286 match self {
1287 Self::Pending(added) => added.transaction.id(),
1288 Self::Parked { transaction, .. } => transaction.id(),
1289 }
1290 }
1291
1292 pub const fn queued_reason(&self) -> Option<&QueuedReason> {
1294 match self {
1295 Self::Pending(_) => None,
1296 Self::Parked { queued_reason, .. } => queued_reason.as_ref(),
1297 }
1298 }
1299
1300 pub fn transaction_state(&self) -> AddedTransactionState {
1302 match self.subpool() {
1303 SubPool::Pending => AddedTransactionState::Pending,
1304 _ => {
1305 if let Some(reason) = self.queued_reason() {
1308 AddedTransactionState::Queued(reason.clone())
1309 } else {
1310 AddedTransactionState::Queued(QueuedReason::NonceGap)
1312 }
1313 }
1314 }
1315 }
1316}
1317
1318#[derive(Debug, Clone, PartialEq, Eq)]
1320pub enum QueuedReason {
1321 NonceGap,
1323 ParkedAncestors,
1325 InsufficientBalance,
1327 TooMuchGas,
1329 InsufficientBaseFee,
1331 InsufficientBlobFee,
1333}
1334
1335#[derive(Debug, Clone, PartialEq, Eq)]
1337pub enum AddedTransactionState {
1338 Pending,
1340 Queued(QueuedReason),
1342}
1343
1344impl AddedTransactionState {
1345 pub const fn is_queued(&self) -> bool {
1347 matches!(self, Self::Queued(_))
1348 }
1349
1350 pub const fn is_pending(&self) -> bool {
1352 matches!(self, Self::Pending)
1353 }
1354
1355 pub const fn queued_reason(&self) -> Option<&QueuedReason> {
1357 match self {
1358 Self::Queued(reason) => Some(reason),
1359 Self::Pending => None,
1360 }
1361 }
1362}
1363
1364#[derive(Debug, Clone, PartialEq, Eq)]
1366pub struct AddedTransactionOutcome {
1367 pub hash: TxHash,
1369 pub state: AddedTransactionState,
1371}
1372
1373impl AddedTransactionOutcome {
1374 pub const fn is_queued(&self) -> bool {
1376 self.state.is_queued()
1377 }
1378
1379 pub const fn is_pending(&self) -> bool {
1381 self.state.is_pending()
1382 }
1383}
1384
1385#[derive(Debug)]
1387pub(crate) struct OnNewCanonicalStateOutcome<T: PoolTransaction> {
1388 pub(crate) block_hash: B256,
1390 pub(crate) mined: Vec<TxHash>,
1392 pub(crate) promoted: Vec<Arc<ValidPoolTransaction<T>>>,
1394 pub(crate) discarded: Vec<Arc<ValidPoolTransaction<T>>>,
1396}
1397
1398impl<T: PoolTransaction> OnNewCanonicalStateOutcome<T> {
1399 pub(crate) fn pending_transactions(
1405 &self,
1406 kind: TransactionListenerKind,
1407 ) -> impl Iterator<Item = B256> + '_ {
1408 let iter = self.promoted.iter();
1409 PendingTransactionIter { kind, iter }
1410 }
1411
1412 pub(crate) fn full_pending_transactions(
1418 &self,
1419 kind: TransactionListenerKind,
1420 ) -> impl Iterator<Item = NewTransactionEvent<T>> + '_ {
1421 let iter = self.promoted.iter();
1422 FullPendingTransactionIter { kind, iter }
1423 }
1424}
1425
1426#[cfg(test)]
1427mod tests {
1428 use crate::{
1429 blobstore::{BlobStore, InMemoryBlobStore},
1430 identifier::SenderId,
1431 test_utils::{MockTransaction, TestPoolBuilder},
1432 validate::ValidTransaction,
1433 BlockInfo, PoolConfig, SubPoolLimit, TransactionOrigin, TransactionValidationOutcome, U256,
1434 };
1435 use alloy_eips::{eip4844::BlobTransactionSidecar, eip7594::BlobTransactionSidecarVariant};
1436 use alloy_primitives::Address;
1437 use std::{fs, path::PathBuf};
1438
1439 #[test]
1440 fn test_discard_blobs_on_blob_tx_eviction() {
1441 let blobs = {
1442 let json_content = fs::read_to_string(
1444 PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test_data/blob1.json"),
1445 )
1446 .expect("Failed to read the blob data file");
1447
1448 let json_value: serde_json::Value =
1450 serde_json::from_str(&json_content).expect("Failed to deserialize JSON");
1451
1452 vec![
1454 json_value
1456 .get("data")
1457 .unwrap()
1458 .as_str()
1459 .expect("Data is not a valid string")
1460 .to_string(),
1461 ]
1462 };
1463
1464 let sidecar = BlobTransactionSidecarVariant::Eip4844(
1466 BlobTransactionSidecar::try_from_blobs_hex(blobs).unwrap(),
1467 );
1468
1469 let blob_limit = SubPoolLimit::new(1000, usize::MAX);
1471
1472 let test_pool = &TestPoolBuilder::default()
1474 .with_config(PoolConfig { blob_limit, ..Default::default() })
1475 .pool;
1476
1477 test_pool
1479 .set_block_info(BlockInfo { pending_blob_fee: Some(10_000_000), ..Default::default() });
1480
1481 let blob_store = InMemoryBlobStore::default();
1483
1484 for n in 0..blob_limit.max_txs + 10 {
1486 let mut tx = MockTransaction::eip4844_with_sidecar(sidecar.clone());
1488
1489 tx.set_size(1844674407370951);
1491
1492 if n < blob_limit.max_txs {
1494 blob_store.insert(*tx.get_hash(), sidecar.clone()).unwrap();
1495 }
1496
1497 test_pool.add_transactions(
1499 TransactionOrigin::External,
1500 [TransactionValidationOutcome::Valid {
1501 balance: U256::from(1_000),
1502 state_nonce: 0,
1503 bytecode_hash: None,
1504 transaction: ValidTransaction::ValidWithSidecar {
1505 transaction: tx,
1506 sidecar: sidecar.clone(),
1507 },
1508 propagate: true,
1509 authorities: None,
1510 }],
1511 );
1512 }
1513
1514 assert_eq!(test_pool.size().blob, blob_limit.max_txs);
1516
1517 assert_eq!(test_pool.size().blob_size, 1844674407370951000);
1519
1520 assert_eq!(*test_pool.blob_store(), blob_store);
1522 }
1523
1524 #[test]
1525 fn test_auths_stored_in_identifiers() {
1526 let test_pool = &TestPoolBuilder::default().with_config(Default::default()).pool;
1528
1529 let auth = Address::new([1; 20]);
1530 let tx = MockTransaction::eip7702();
1531
1532 test_pool.add_transactions(
1533 TransactionOrigin::Local,
1534 [TransactionValidationOutcome::Valid {
1535 balance: U256::from(1_000),
1536 state_nonce: 0,
1537 bytecode_hash: None,
1538 transaction: ValidTransaction::Valid(tx),
1539 propagate: true,
1540 authorities: Some(vec![auth]),
1541 }],
1542 );
1543
1544 let identifiers = test_pool.identifiers.read();
1545 assert_eq!(identifiers.sender_id(&auth), Some(SenderId::from(1)));
1546 }
1547}