1use crate::{
69 blobstore::BlobStore,
70 error::{PoolError, PoolErrorKind, PoolResult},
71 identifier::{SenderId, SenderIdentifiers, TransactionId},
72 metrics::BlobStoreMetrics,
73 pool::{
74 listener::{
75 BlobTransactionSidecarListener, PendingTransactionHashListener, PoolEventBroadcast,
76 TransactionListener,
77 },
78 state::SubPool,
79 txpool::{SenderInfo, TxPool},
80 update::UpdateOutcome,
81 },
82 traits::{
83 AllPoolTransactions, BestTransactionsAttributes, BlockInfo, GetPooledTransactionLimit,
84 NewBlobSidecar, PoolSize, PoolTransaction, PropagatedTransactions, TransactionOrigin,
85 },
86 validate::{TransactionValidationOutcome, ValidPoolTransaction, ValidTransaction},
87 CanonicalStateUpdate, EthPoolTransaction, PoolConfig, TransactionOrdering,
88 TransactionValidator,
89};
90
91use alloy_primitives::{
92 map::{AddressSet, HashSet},
93 Address, TxHash, B256,
94};
95use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
96use reth_eth_wire_types::HandleMempoolData;
97use reth_execution_types::ChangedAccount;
98
99use alloy_eips::{eip7594::BlobTransactionSidecarVariant, Typed2718};
100use reth_primitives_traits::Recovered;
101use rustc_hash::FxHashMap;
102use std::{
103 fmt,
104 sync::{
105 atomic::{AtomicBool, Ordering},
106 Arc,
107 },
108 time::Instant,
109};
110use tokio::sync::mpsc;
111use tracing::{debug, trace, warn};
112mod events;
113pub use best::{BestTransactionFilter, BestTransactionsWithPrioritizedSenders};
114pub use blob::{blob_tx_priority, fee_delta, BlobOrd, BlobTransactions};
115pub use events::{FullTransactionEvent, NewTransactionEvent, TransactionEvent};
116pub use listener::{AllTransactionsEvents, TransactionEvents, TransactionListenerKind};
117pub use parked::{BasefeeOrd, ParkedOrd, ParkedPool, QueuedOrd};
118pub use pending::PendingPool;
119
120mod best;
121pub use best::BestTransactions;
122
123mod blob;
124pub mod listener;
125mod parked;
126pub mod pending;
127pub mod size;
128pub(crate) mod state;
129pub mod txpool;
130mod update;
131
132pub const PENDING_TX_LISTENER_BUFFER_SIZE: usize = 2048;
134pub const NEW_TX_LISTENER_BUFFER_SIZE: usize = 1024;
136
137const BLOB_SIDECAR_LISTENER_BUFFER_SIZE: usize = 512;
138
139pub struct PoolInner<V, T, S>
141where
142 T: TransactionOrdering,
143{
144 identifiers: RwLock<SenderIdentifiers>,
146 validator: V,
148 blob_store: S,
150 pool: RwLock<TxPool<T>>,
152 config: PoolConfig,
154 event_listener: RwLock<PoolEventBroadcast<T::Transaction>>,
156 has_event_listeners: AtomicBool,
158 pending_transaction_listener: RwLock<Vec<PendingTransactionHashListener>>,
160 transaction_listener: RwLock<Vec<TransactionListener<T::Transaction>>>,
162 blob_transaction_sidecar_listener: Mutex<Vec<BlobTransactionSidecarListener>>,
164 blob_store_metrics: BlobStoreMetrics,
166}
167
168impl<V, T, S> PoolInner<V, T, S>
171where
172 V: TransactionValidator,
173 T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
174 S: BlobStore,
175{
176 pub fn new(validator: V, ordering: T, blob_store: S, config: PoolConfig) -> Self {
178 Self {
179 identifiers: Default::default(),
180 validator,
181 event_listener: Default::default(),
182 has_event_listeners: AtomicBool::new(false),
183 pool: RwLock::new(TxPool::new(ordering, config.clone())),
184 pending_transaction_listener: Default::default(),
185 transaction_listener: Default::default(),
186 blob_transaction_sidecar_listener: Default::default(),
187 config,
188 blob_store,
189 blob_store_metrics: Default::default(),
190 }
191 }
192
193 pub const fn blob_store(&self) -> &S {
195 &self.blob_store
196 }
197
198 pub fn size(&self) -> PoolSize {
200 self.get_pool_data().size()
201 }
202
203 pub fn block_info(&self) -> BlockInfo {
205 self.get_pool_data().block_info()
206 }
207 pub fn set_block_info(&self, info: BlockInfo) {
212 let outcome = self.pool.write().set_block_info(info);
213
214 self.notify_on_transaction_updates(outcome.promoted, outcome.discarded);
216 }
217
218 pub fn get_sender_id(&self, addr: Address) -> SenderId {
225 self.identifiers.write().sender_id_or_create(addr)
226 }
227
228 pub fn sender_id(&self, addr: &Address) -> Option<SenderId> {
233 self.identifiers.read().sender_id(addr)
234 }
235
236 pub fn get_sender_ids(&self, addrs: impl IntoIterator<Item = Address>) -> Vec<SenderId> {
238 self.identifiers.write().sender_ids_or_create(addrs)
239 }
240
241 pub fn unique_senders(&self) -> AddressSet {
243 self.get_pool_data().unique_senders()
244 }
245
246 fn changed_senders(
249 &self,
250 accs: impl Iterator<Item = ChangedAccount>,
251 ) -> FxHashMap<SenderId, SenderInfo> {
252 let identifiers = self.identifiers.read();
253 accs.into_iter()
254 .filter_map(|acc| {
255 let ChangedAccount { address, nonce, balance } = acc;
256 let sender_id = identifiers.sender_id(&address)?;
257 Some((sender_id, SenderInfo { state_nonce: nonce, balance }))
258 })
259 .collect()
260 }
261
262 pub const fn config(&self) -> &PoolConfig {
264 &self.config
265 }
266
267 pub const fn validator(&self) -> &V {
269 &self.validator
270 }
271
272 pub fn add_pending_listener(&self, kind: TransactionListenerKind) -> mpsc::Receiver<TxHash> {
275 let (sender, rx) = mpsc::channel(self.config.pending_tx_listener_buffer_size);
276 let listener = PendingTransactionHashListener { sender, kind };
277
278 let mut listeners = self.pending_transaction_listener.write();
279 listeners.retain(|l| !l.sender.is_closed());
281 listeners.push(listener);
282
283 rx
284 }
285
286 pub fn add_new_transaction_listener(
288 &self,
289 kind: TransactionListenerKind,
290 ) -> mpsc::Receiver<NewTransactionEvent<T::Transaction>> {
291 let (sender, rx) = mpsc::channel(self.config.new_tx_listener_buffer_size);
292 let listener = TransactionListener { sender, kind };
293
294 let mut listeners = self.transaction_listener.write();
295 listeners.retain(|l| !l.sender.is_closed());
297 listeners.push(listener);
298
299 rx
300 }
301 pub fn add_blob_sidecar_listener(&self) -> mpsc::Receiver<NewBlobSidecar> {
304 let (sender, rx) = mpsc::channel(BLOB_SIDECAR_LISTENER_BUFFER_SIZE);
305 let listener = BlobTransactionSidecarListener { sender };
306 self.blob_transaction_sidecar_listener.lock().push(listener);
307 rx
308 }
309
310 pub fn add_transaction_event_listener(&self, tx_hash: TxHash) -> Option<TransactionEvents> {
313 if !self.get_pool_data().contains(&tx_hash) {
314 return None
315 }
316 let mut listener = self.event_listener.write();
317 let events = listener.subscribe(tx_hash);
318 self.mark_event_listener_installed();
319 Some(events)
320 }
321
322 pub fn add_all_transactions_event_listener(&self) -> AllTransactionsEvents<T::Transaction> {
324 let mut listener = self.event_listener.write();
325 let events = listener.subscribe_all();
326 self.mark_event_listener_installed();
327 events
328 }
329
330 #[inline]
331 fn has_event_listeners(&self) -> bool {
332 self.has_event_listeners.load(Ordering::Relaxed)
333 }
334
335 #[inline]
336 fn mark_event_listener_installed(&self) {
337 self.has_event_listeners.store(true, Ordering::Relaxed);
338 }
339
340 #[inline]
341 fn update_event_listener_state(&self, listener: &PoolEventBroadcast<T::Transaction>) {
342 if listener.is_empty() {
343 self.has_event_listeners.store(false, Ordering::Relaxed);
344 }
345 }
346
347 #[inline]
348 fn with_event_listener<F>(&self, emit: F)
349 where
350 F: FnOnce(&mut PoolEventBroadcast<T::Transaction>),
351 {
352 if !self.has_event_listeners() {
353 return
354 }
355 let mut listener = self.event_listener.write();
356 if !listener.is_empty() {
357 emit(&mut listener);
358 }
359 self.update_event_listener_state(&listener);
360 }
361
362 pub fn get_pool_data(&self) -> RwLockReadGuard<'_, TxPool<T>> {
364 self.pool.read()
365 }
366
367 pub fn pooled_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
369 let mut out = Vec::new();
370 self.append_pooled_transactions(&mut out);
371 out
372 }
373
374 pub fn pooled_transactions_hashes(&self) -> Vec<TxHash> {
376 let mut out = Vec::new();
377 self.append_pooled_transactions_hashes(&mut out);
378 out
379 }
380
381 pub fn pooled_transactions_max(
383 &self,
384 max: usize,
385 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
386 if max == 0 {
387 return Vec::new()
388 }
389
390 let pool = self.get_pool_data();
391 let mut out = Vec::with_capacity(max.min(pool.all().len()));
392 out.extend(pool.all().transactions_iter().filter(|tx| tx.propagate).take(max).cloned());
393 out
394 }
395
396 pub fn append_pooled_transactions(
398 &self,
399 out: &mut Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
400 ) {
401 out.extend(
402 self.get_pool_data().all().transactions_iter().filter(|tx| tx.propagate).cloned(),
403 );
404 }
405
406 pub fn append_pooled_transaction_elements(
409 &self,
410 tx_hashes: &[TxHash],
411 limit: GetPooledTransactionLimit,
412 out: &mut Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>,
413 ) where
414 <V as TransactionValidator>::Transaction: EthPoolTransaction,
415 {
416 let transactions = self.get_all_propagatable(tx_hashes);
417 let mut size = 0;
418 for transaction in transactions {
419 let encoded_len = transaction.encoded_length();
420 let Some(pooled) = self.to_pooled_transaction(transaction) else {
421 continue;
422 };
423
424 size += encoded_len;
425 out.push(pooled.into_inner());
426
427 if limit.exceeds(size) {
428 break
429 }
430 }
431 }
432
433 pub fn append_pooled_transactions_hashes(&self, out: &mut Vec<TxHash>) {
436 out.extend(
437 self.get_pool_data()
438 .all()
439 .transactions_iter()
440 .filter(|tx| tx.propagate)
441 .map(|tx| *tx.hash()),
442 );
443 }
444
445 pub fn append_pooled_transactions_max(
448 &self,
449 max: usize,
450 out: &mut Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
451 ) {
452 out.extend(
453 self.get_pool_data()
454 .all()
455 .transactions_iter()
456 .filter(|tx| tx.propagate)
457 .take(max)
458 .cloned(),
459 );
460 }
461
462 pub fn pooled_transactions_hashes_max(&self, max: usize) -> Vec<TxHash> {
464 if max == 0 {
465 return Vec::new();
466 }
467
468 let pool = self.get_pool_data();
469 let mut out = Vec::with_capacity(max.min(pool.all().len()));
470 out.extend(
471 pool.all().transactions_iter().filter(|tx| tx.propagate).take(max).map(|tx| *tx.hash()),
472 );
473 out
474 }
475
476 fn to_pooled_transaction(
481 &self,
482 transaction: Arc<ValidPoolTransaction<T::Transaction>>,
483 ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
484 where
485 <V as TransactionValidator>::Transaction: EthPoolTransaction,
486 {
487 if transaction.is_eip4844() {
488 let sidecar = self.blob_store.get(*transaction.hash()).ok()??;
489 transaction.transaction.clone().try_into_pooled_eip4844(sidecar)
490 } else {
491 transaction
492 .transaction
493 .clone_into_pooled()
494 .inspect_err(|err| {
495 debug!(
496 target: "txpool", %err,
497 "failed to convert transaction to pooled element; skipping",
498 );
499 })
500 .ok()
501 }
502 }
503
504 pub fn get_pooled_transaction_elements(
507 &self,
508 tx_hashes: Vec<TxHash>,
509 limit: GetPooledTransactionLimit,
510 ) -> Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>
511 where
512 <V as TransactionValidator>::Transaction: EthPoolTransaction,
513 {
514 let mut elements = Vec::new();
515 self.append_pooled_transaction_elements(&tx_hashes, limit, &mut elements);
516 elements.shrink_to_fit();
517 elements
518 }
519
520 pub fn get_pooled_transaction_element(
522 &self,
523 tx_hash: TxHash,
524 ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
525 where
526 <V as TransactionValidator>::Transaction: EthPoolTransaction,
527 {
528 self.get(&tx_hash).and_then(|tx| self.to_pooled_transaction(tx))
529 }
530
531 pub fn on_canonical_state_change(&self, update: CanonicalStateUpdate<'_, V::Block>) {
533 trace!(target: "txpool", ?update, "updating pool on canonical state change");
534
535 let block_info = update.block_info();
536 let CanonicalStateUpdate {
537 new_tip, changed_accounts, mined_transactions, update_kind, ..
538 } = update;
539 self.validator.on_new_head_block(new_tip);
540
541 let changed_senders = self.changed_senders(changed_accounts.into_iter());
542
543 let outcome = self.pool.write().on_canonical_state_change(
545 block_info,
546 mined_transactions,
547 changed_senders,
548 update_kind,
549 );
550
551 self.delete_discarded_blobs(outcome.discarded.iter());
553
554 self.notify_on_new_state(outcome);
556 }
557
558 pub fn update_accounts(&self, accounts: Vec<ChangedAccount>) {
564 let changed_senders = self.changed_senders(accounts.into_iter());
565 let UpdateOutcome { promoted, discarded } =
566 self.pool.write().update_accounts(changed_senders);
567
568 self.notify_on_transaction_updates(promoted, discarded);
569 }
570
571 fn add_transaction(
579 &self,
580 pool: &mut RwLockWriteGuard<'_, TxPool<T>>,
581 origin: TransactionOrigin,
582 tx: TransactionValidationOutcome<T::Transaction>,
583 ) -> (PoolResult<AddedTransactionOutcome>, Option<AddedTransactionMeta<T::Transaction>>) {
584 match tx {
585 TransactionValidationOutcome::Valid {
586 balance,
587 state_nonce,
588 transaction,
589 propagate,
590 bytecode_hash,
591 authorities,
592 } => {
593 let sender_id = self.get_sender_id(transaction.sender());
594 let transaction_id = TransactionId::new(sender_id, transaction.nonce());
595
596 let (transaction, blob_sidecar) = match transaction {
598 ValidTransaction::Valid(tx) => (tx, None),
599 ValidTransaction::ValidWithSidecar { transaction, sidecar } => {
600 debug_assert!(
601 transaction.is_eip4844(),
602 "validator returned sidecar for non EIP-4844 transaction"
603 );
604 (transaction, Some(sidecar))
605 }
606 };
607
608 let tx = ValidPoolTransaction {
609 transaction,
610 transaction_id,
611 propagate,
612 timestamp: Instant::now(),
613 origin,
614 authority_ids: authorities.map(|auths| self.get_sender_ids(auths)),
615 };
616
617 let added = match pool.add_transaction(tx, balance, state_nonce, bytecode_hash) {
618 Ok(added) => added,
619 Err(err) => return (Err(err), None),
620 };
621 let hash = *added.hash();
622 let state = added.transaction_state();
623
624 let meta = AddedTransactionMeta { added, blob_sidecar };
625
626 (Ok(AddedTransactionOutcome { hash, state }), Some(meta))
627 }
628 TransactionValidationOutcome::Invalid(tx, err) => {
629 self.with_event_listener(|listener| listener.invalid(tx.hash()));
630 (Err(PoolError::new(*tx.hash(), err)), None)
631 }
632 TransactionValidationOutcome::Error(tx_hash, err) => {
633 self.with_event_listener(|listener| listener.discarded(&tx_hash));
634 (Err(PoolError::other(tx_hash, err)), None)
635 }
636 }
637 }
638
639 pub fn add_transaction_and_subscribe(
641 &self,
642 origin: TransactionOrigin,
643 tx: TransactionValidationOutcome<T::Transaction>,
644 ) -> PoolResult<TransactionEvents> {
645 let listener = {
646 let mut listener = self.event_listener.write();
647 let events = listener.subscribe(tx.tx_hash());
648 self.mark_event_listener_installed();
649 events
650 };
651 let mut results = self.add_transactions(origin, std::iter::once(tx));
652 results.pop().expect("result length is the same as the input")?;
653 Ok(listener)
654 }
655
656 pub fn add_transactions(
661 &self,
662 origin: TransactionOrigin,
663 transactions: impl IntoIterator<Item = TransactionValidationOutcome<T::Transaction>>,
664 ) -> Vec<PoolResult<AddedTransactionOutcome>> {
665 self.add_transactions_with_origins(transactions.into_iter().map(|tx| (origin, tx)))
666 }
667
668 pub fn add_transactions_with_origins(
671 &self,
672 transactions: impl IntoIterator<
673 Item = (TransactionOrigin, TransactionValidationOutcome<T::Transaction>),
674 >,
675 ) -> Vec<PoolResult<AddedTransactionOutcome>> {
676 let (mut results, added_metas, discarded) = {
678 let mut pool = self.pool.write();
679 let mut added_metas = Vec::new();
680
681 let results = transactions
682 .into_iter()
683 .map(|(origin, tx)| {
684 let (result, meta) = self.add_transaction(&mut pool, origin, tx);
685
686 if result.is_ok() &&
688 let Some(meta) = meta
689 {
690 added_metas.push(meta);
691 }
692
693 result
694 })
695 .collect::<Vec<_>>();
696
697 let discarded = if results.iter().any(Result::is_ok) {
699 pool.discard_worst()
700 } else {
701 Default::default()
702 };
703
704 (results, added_metas, discarded)
705 };
706
707 for meta in added_metas {
708 self.on_added_transaction(meta);
709 }
710
711 if !discarded.is_empty() {
712 self.delete_discarded_blobs(discarded.iter());
714 self.with_event_listener(|listener| listener.discarded_many(&discarded));
715
716 let discarded_hashes =
717 discarded.into_iter().map(|tx| *tx.hash()).collect::<HashSet<_>>();
718
719 for res in &mut results {
722 if let Ok(AddedTransactionOutcome { hash, .. }) = res &&
723 discarded_hashes.contains(hash)
724 {
725 *res = Err(PoolError::new(*hash, PoolErrorKind::DiscardedOnInsert))
726 }
727 }
728 };
729
730 results
731 }
732
733 fn on_added_transaction(&self, meta: AddedTransactionMeta<T::Transaction>) {
738 if let Some(sidecar) = meta.blob_sidecar {
740 let hash = *meta.added.hash();
741 self.on_new_blob_sidecar(&hash, &sidecar);
742 self.insert_blob(hash, sidecar);
743 }
744
745 if let Some(replaced) = meta.added.replaced_blob_transaction() {
747 debug!(target: "txpool", "[{:?}] delete replaced blob sidecar", replaced);
748 self.delete_blob(replaced);
749 }
750
751 if let Some(discarded) = meta.added.discarded_transactions() {
753 self.delete_discarded_blobs(discarded.iter());
754 }
755
756 if let Some(pending) = meta.added.as_pending() {
758 self.on_new_pending_transaction(pending);
759 }
760
761 self.notify_event_listeners(&meta.added);
763
764 self.on_new_transaction(meta.added.into_new_transaction_event());
766 }
767
768 pub fn on_new_pending_transaction(&self, pending: &AddedPendingTransaction<T::Transaction>) {
777 let mut needs_cleanup = false;
778
779 {
780 let listeners = self.pending_transaction_listener.read();
781 for listener in listeners.iter() {
782 if !listener.send_all(pending.pending_transactions(listener.kind)) {
783 needs_cleanup = true;
784 }
785 }
786 }
787
788 if needs_cleanup {
790 self.pending_transaction_listener
791 .write()
792 .retain(|listener| !listener.sender.is_closed());
793 }
794 }
795
796 pub fn on_new_transaction(&self, event: NewTransactionEvent<T::Transaction>) {
805 let mut needs_cleanup = false;
806
807 {
808 let listeners = self.transaction_listener.read();
809 for listener in listeners.iter() {
810 if listener.kind.is_propagate_only() && !event.transaction.propagate {
811 if listener.sender.is_closed() {
812 needs_cleanup = true;
813 }
814 continue
816 }
817
818 if !listener.send(event.clone()) {
819 needs_cleanup = true;
820 }
821 }
822 }
823
824 if needs_cleanup {
826 self.transaction_listener.write().retain(|listener| !listener.sender.is_closed());
827 }
828 }
829
830 fn on_new_blob_sidecar(&self, tx_hash: &TxHash, sidecar: &BlobTransactionSidecarVariant) {
832 let mut sidecar_listeners = self.blob_transaction_sidecar_listener.lock();
833 if sidecar_listeners.is_empty() {
834 return
835 }
836 let sidecar = Arc::new(sidecar.clone());
837 sidecar_listeners.retain_mut(|listener| {
838 let new_blob_event = NewBlobSidecar { tx_hash: *tx_hash, sidecar: sidecar.clone() };
839 match listener.sender.try_send(new_blob_event) {
840 Ok(()) => true,
841 Err(err) => {
842 if matches!(err, mpsc::error::TrySendError::Full(_)) {
843 debug!(
844 target: "txpool",
845 "[{:?}] failed to send blob sidecar; channel full",
846 sidecar,
847 );
848 true
849 } else {
850 false
851 }
852 }
853 }
854 })
855 }
856
857 fn notify_on_new_state(&self, outcome: OnNewCanonicalStateOutcome<T::Transaction>) {
859 trace!(target: "txpool", promoted=outcome.promoted.len(), discarded= outcome.discarded.len() ,"notifying listeners on state change");
860
861 let mut needs_pending_cleanup = false;
863 {
864 let listeners = self.pending_transaction_listener.read();
865 for listener in listeners.iter() {
866 if !listener.send_all(outcome.pending_transactions(listener.kind)) {
867 needs_pending_cleanup = true;
868 }
869 }
870 }
871 if needs_pending_cleanup {
872 self.pending_transaction_listener.write().retain(|l| !l.sender.is_closed());
873 }
874
875 let mut needs_tx_cleanup = false;
877 {
878 let listeners = self.transaction_listener.read();
879 for listener in listeners.iter() {
880 if !listener.send_all(outcome.full_pending_transactions(listener.kind)) {
881 needs_tx_cleanup = true;
882 }
883 }
884 }
885 if needs_tx_cleanup {
886 self.transaction_listener.write().retain(|l| !l.sender.is_closed());
887 }
888
889 let OnNewCanonicalStateOutcome { mined, promoted, discarded, block_hash } = outcome;
890
891 self.with_event_listener(|listener| {
893 for tx in &mined {
894 listener.mined(tx, block_hash);
895 }
896 for tx in &promoted {
897 listener.pending(tx.hash(), None);
898 }
899 for tx in &discarded {
900 listener.discarded(tx.hash());
901 }
902 })
903 }
904
905 pub fn notify_on_transaction_updates(
914 &self,
915 promoted: Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
916 discarded: Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
917 ) {
918 if !promoted.is_empty() {
920 let mut needs_pending_cleanup = false;
921 {
922 let listeners = self.pending_transaction_listener.read();
923 for listener in listeners.iter() {
924 let promoted_hashes = promoted.iter().filter_map(|tx| {
925 if listener.kind.is_propagate_only() && !tx.propagate {
926 None
927 } else {
928 Some(*tx.hash())
929 }
930 });
931 if !listener.send_all(promoted_hashes) {
932 needs_pending_cleanup = true;
933 }
934 }
935 }
936 if needs_pending_cleanup {
937 self.pending_transaction_listener.write().retain(|l| !l.sender.is_closed());
938 }
939
940 let mut needs_tx_cleanup = false;
942 {
943 let listeners = self.transaction_listener.read();
944 for listener in listeners.iter() {
945 let promoted_txs = promoted.iter().filter_map(|tx| {
946 if listener.kind.is_propagate_only() && !tx.propagate {
947 None
948 } else {
949 Some(NewTransactionEvent::pending(tx.clone()))
950 }
951 });
952 if !listener.send_all(promoted_txs) {
953 needs_tx_cleanup = true;
954 }
955 }
956 }
957 if needs_tx_cleanup {
958 self.transaction_listener.write().retain(|l| !l.sender.is_closed());
959 }
960 }
961
962 self.with_event_listener(|listener| {
963 for tx in &promoted {
964 listener.pending(tx.hash(), None);
965 }
966 for tx in &discarded {
967 listener.discarded(tx.hash());
968 }
969 });
970
971 if !discarded.is_empty() {
972 self.delete_discarded_blobs(discarded.iter());
975 }
976 }
977
978 pub fn notify_event_listeners(&self, tx: &AddedTransaction<T::Transaction>) {
987 self.with_event_listener(|listener| match tx {
988 AddedTransaction::Pending(tx) => {
989 let AddedPendingTransaction { transaction, promoted, discarded, replaced } = tx;
990
991 listener.pending(transaction.hash(), replaced.clone());
992 for tx in promoted {
993 listener.pending(tx.hash(), None);
994 }
995 for tx in discarded {
996 listener.discarded(tx.hash());
997 }
998 }
999 AddedTransaction::Parked { transaction, replaced, queued_reason, .. } => {
1000 listener.queued(transaction.hash(), queued_reason.clone());
1001 if let Some(replaced) = replaced {
1002 listener.replaced(replaced.clone(), *transaction.hash());
1003 }
1004 }
1005 });
1006 }
1007
1008 pub fn best_transactions(&self) -> BestTransactions<T> {
1010 self.get_pool_data().best_transactions()
1011 }
1012
1013 pub fn best_transactions_with_attributes(
1016 &self,
1017 best_transactions_attributes: BestTransactionsAttributes,
1018 ) -> Box<dyn crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T::Transaction>>>>
1019 {
1020 self.get_pool_data().best_transactions_with_attributes(best_transactions_attributes)
1021 }
1022
1023 pub fn pending_transactions_max(
1025 &self,
1026 max: usize,
1027 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1028 self.get_pool_data().pending_transactions_iter().take(max).collect()
1029 }
1030
1031 pub fn pending_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1033 self.get_pool_data().pending_transactions()
1034 }
1035
1036 pub fn queued_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1038 self.get_pool_data().queued_transactions()
1039 }
1040
1041 pub fn all_transactions(&self) -> AllPoolTransactions<T::Transaction> {
1043 let pool = self.get_pool_data();
1044 AllPoolTransactions {
1045 pending: pool.pending_transactions(),
1046 queued: pool.queued_transactions(),
1047 }
1048 }
1049
1050 pub fn all_transaction_hashes(&self) -> Vec<TxHash> {
1052 self.get_pool_data().all().transactions_iter().map(|tx| *tx.hash()).collect()
1053 }
1054
1055 pub fn remove_transactions(
1060 &self,
1061 hashes: Vec<TxHash>,
1062 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1063 if hashes.is_empty() {
1064 return Vec::new()
1065 }
1066 let removed = self.pool.write().remove_transactions(hashes);
1067
1068 self.with_event_listener(|listener| listener.discarded_many(&removed));
1069
1070 removed
1071 }
1072
1073 pub fn remove_transactions_and_descendants(
1076 &self,
1077 hashes: Vec<TxHash>,
1078 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1079 if hashes.is_empty() {
1080 return Vec::new()
1081 }
1082 let removed = self.pool.write().remove_transactions_and_descendants(hashes);
1083
1084 self.with_event_listener(|listener| {
1085 for tx in &removed {
1086 listener.discarded(tx.hash());
1087 }
1088 });
1089
1090 removed
1091 }
1092
1093 pub fn remove_transactions_by_sender(
1095 &self,
1096 sender: Address,
1097 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1098 let Some(sender_id) = self.sender_id(&sender) else { return Vec::new() };
1099 let removed = self.pool.write().remove_transactions_by_sender(sender_id);
1100
1101 self.with_event_listener(|listener| listener.discarded_many(&removed));
1102
1103 removed
1104 }
1105
1106 pub fn prune_transactions(
1111 &self,
1112 hashes: Vec<TxHash>,
1113 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1114 if hashes.is_empty() {
1115 return Vec::new()
1116 }
1117
1118 self.pool.write().prune_transactions(hashes)
1119 }
1120
1121 pub fn retain_unknown<A>(&self, announcement: &mut A)
1123 where
1124 A: HandleMempoolData,
1125 {
1126 if announcement.is_empty() {
1127 return
1128 }
1129 let pool = self.get_pool_data();
1130 announcement.retain_by_hash(|tx| !pool.contains(tx))
1131 }
1132
1133 pub fn retain_contains<A>(&self, announcement: &mut A)
1135 where
1136 A: HandleMempoolData,
1137 {
1138 if announcement.is_empty() {
1139 return
1140 }
1141 let pool = self.get_pool_data();
1142 announcement.retain_by_hash(|tx| pool.contains(tx))
1143 }
1144
1145 pub fn get(&self, tx_hash: &TxHash) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1147 self.get_pool_data().get(tx_hash)
1148 }
1149
1150 pub fn get_transactions_by_sender(
1152 &self,
1153 sender: Address,
1154 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1155 let Some(sender_id) = self.sender_id(&sender) else { return Vec::new() };
1156 self.get_pool_data().get_transactions_by_sender(sender_id)
1157 }
1158
1159 pub fn get_pending_transaction_by_sender_and_nonce(
1161 &self,
1162 sender: Address,
1163 nonce: u64,
1164 ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1165 let sender_id = self.sender_id(&sender)?;
1166 self.get_pool_data().get_pending_transaction_by_sender_and_nonce(sender_id, nonce)
1167 }
1168
1169 pub fn get_queued_transactions_by_sender(
1171 &self,
1172 sender: Address,
1173 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1174 let Some(sender_id) = self.sender_id(&sender) else { return Vec::new() };
1175 self.get_pool_data().queued_txs_by_sender(sender_id)
1176 }
1177
1178 pub fn pending_transactions_with_predicate(
1180 &self,
1181 predicate: impl FnMut(&ValidPoolTransaction<T::Transaction>) -> bool,
1182 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1183 self.get_pool_data().pending_transactions_with_predicate(predicate)
1184 }
1185
1186 pub fn get_pending_transactions_by_sender(
1188 &self,
1189 sender: Address,
1190 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1191 let Some(sender_id) = self.sender_id(&sender) else { return Vec::new() };
1192 self.get_pool_data().pending_txs_by_sender(sender_id)
1193 }
1194
1195 pub fn get_highest_transaction_by_sender(
1197 &self,
1198 sender: Address,
1199 ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1200 let sender_id = self.sender_id(&sender)?;
1201 self.get_pool_data().get_highest_transaction_by_sender(sender_id)
1202 }
1203
1204 pub fn get_highest_consecutive_transaction_by_sender(
1206 &self,
1207 sender: Address,
1208 on_chain_nonce: u64,
1209 ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1210 let sender_id = self.sender_id(&sender)?;
1211 self.get_pool_data().get_highest_consecutive_transaction_by_sender(
1212 sender_id.into_transaction_id(on_chain_nonce),
1213 )
1214 }
1215
1216 pub fn get_transaction_by_transaction_id(
1218 &self,
1219 transaction_id: &TransactionId,
1220 ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1221 self.get_pool_data().all().get(transaction_id).map(|tx| tx.transaction.clone())
1222 }
1223
1224 pub fn get_transactions_by_origin(
1226 &self,
1227 origin: TransactionOrigin,
1228 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1229 self.get_pool_data()
1230 .all()
1231 .transactions_iter()
1232 .filter(|tx| tx.origin == origin)
1233 .cloned()
1234 .collect()
1235 }
1236
1237 pub fn get_pending_transactions_by_origin(
1239 &self,
1240 origin: TransactionOrigin,
1241 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1242 self.get_pool_data().pending_transactions_iter().filter(|tx| tx.origin == origin).collect()
1243 }
1244
1245 pub fn get_all(&self, txs: Vec<TxHash>) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1249 if txs.is_empty() {
1250 return Vec::new()
1251 }
1252 self.get_pool_data().get_all(txs).collect()
1253 }
1254
1255 fn get_all_propagatable(
1259 &self,
1260 txs: &[TxHash],
1261 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1262 if txs.is_empty() {
1263 return Vec::new()
1264 }
1265 let pool = self.get_pool_data();
1266 txs.iter().filter_map(|tx| pool.get(tx).filter(|tx| tx.propagate)).collect()
1267 }
1268
1269 pub fn on_propagated(&self, txs: PropagatedTransactions) {
1271 if txs.is_empty() {
1272 return
1273 }
1274 self.with_event_listener(|listener| {
1275 txs.into_iter().for_each(|(hash, peers)| listener.propagated(&hash, peers));
1276 });
1277 }
1278
1279 pub fn len(&self) -> usize {
1281 self.get_pool_data().len()
1282 }
1283
1284 pub fn is_empty(&self) -> bool {
1286 self.get_pool_data().is_empty()
1287 }
1288
1289 pub fn is_exceeded(&self) -> bool {
1291 self.pool.read().is_exceeded()
1292 }
1293
1294 fn insert_blob(&self, hash: TxHash, blob: BlobTransactionSidecarVariant) {
1296 debug!(target: "txpool", "[{:?}] storing blob sidecar", hash);
1297 if let Err(err) = self.blob_store.insert(hash, blob) {
1298 warn!(target: "txpool", %err, "[{:?}] failed to insert blob", hash);
1299 self.blob_store_metrics.blobstore_failed_inserts.increment(1);
1300 }
1301 self.update_blob_store_metrics();
1302 }
1303
1304 pub fn delete_blob(&self, blob: TxHash) {
1306 let _ = self.blob_store.delete(blob);
1307 }
1308
1309 pub fn delete_blobs(&self, txs: Vec<TxHash>) {
1311 let _ = self.blob_store.delete_all(txs);
1312 }
1313
1314 pub fn cleanup_blobs(&self) {
1316 let stat = self.blob_store.cleanup();
1317 self.blob_store_metrics.blobstore_failed_deletes.increment(stat.delete_failed as u64);
1318 self.update_blob_store_metrics();
1319 }
1320
1321 fn update_blob_store_metrics(&self) {
1322 if let Some(data_size) = self.blob_store.data_size_hint() {
1323 self.blob_store_metrics.blobstore_byte_size.set(data_size as f64);
1324 }
1325 self.blob_store_metrics.blobstore_entries.set(self.blob_store.blobs_len() as f64);
1326 }
1327
1328 fn delete_discarded_blobs<'a>(
1330 &'a self,
1331 transactions: impl IntoIterator<Item = &'a Arc<ValidPoolTransaction<T::Transaction>>>,
1332 ) {
1333 let blob_txs = transactions
1334 .into_iter()
1335 .filter(|tx| tx.transaction.is_eip4844())
1336 .map(|tx| *tx.hash())
1337 .collect();
1338 self.delete_blobs(blob_txs);
1339 }
1340}
1341
1342impl<V, T: TransactionOrdering, S> fmt::Debug for PoolInner<V, T, S> {
1343 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1344 f.debug_struct("PoolInner").field("config", &self.config).finish_non_exhaustive()
1345 }
1346}
1347
1348#[derive(Debug)]
1353struct AddedTransactionMeta<T: PoolTransaction> {
1354 added: AddedTransaction<T>,
1356 blob_sidecar: Option<BlobTransactionSidecarVariant>,
1358}
1359
1360#[derive(Debug, Clone)]
1362pub struct AddedPendingTransaction<T: PoolTransaction> {
1363 pub transaction: Arc<ValidPoolTransaction<T>>,
1365 pub replaced: Option<Arc<ValidPoolTransaction<T>>>,
1367 pub promoted: Vec<Arc<ValidPoolTransaction<T>>>,
1369 pub discarded: Vec<Arc<ValidPoolTransaction<T>>>,
1371}
1372
1373impl<T: PoolTransaction> AddedPendingTransaction<T> {
1374 pub(crate) fn pending_transactions(
1380 &self,
1381 kind: TransactionListenerKind,
1382 ) -> impl Iterator<Item = B256> + '_ {
1383 let iter = std::iter::once(&self.transaction).chain(self.promoted.iter());
1384 PendingTransactionIter { kind, iter }
1385 }
1386}
1387
1388pub(crate) struct PendingTransactionIter<Iter> {
1389 kind: TransactionListenerKind,
1390 iter: Iter,
1391}
1392
1393impl<'a, Iter, T> Iterator for PendingTransactionIter<Iter>
1394where
1395 Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1396 T: PoolTransaction + 'a,
1397{
1398 type Item = B256;
1399
1400 fn next(&mut self) -> Option<Self::Item> {
1401 loop {
1402 let next = self.iter.next()?;
1403 if self.kind.is_propagate_only() && !next.propagate {
1404 continue
1405 }
1406 return Some(*next.hash())
1407 }
1408 }
1409}
1410
1411pub(crate) struct FullPendingTransactionIter<Iter> {
1413 kind: TransactionListenerKind,
1414 iter: Iter,
1415}
1416
1417impl<'a, Iter, T> Iterator for FullPendingTransactionIter<Iter>
1418where
1419 Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1420 T: PoolTransaction + 'a,
1421{
1422 type Item = NewTransactionEvent<T>;
1423
1424 fn next(&mut self) -> Option<Self::Item> {
1425 loop {
1426 let next = self.iter.next()?;
1427 if self.kind.is_propagate_only() && !next.propagate {
1428 continue
1429 }
1430 return Some(NewTransactionEvent {
1431 subpool: SubPool::Pending,
1432 transaction: next.clone(),
1433 })
1434 }
1435 }
1436}
1437
1438#[derive(Debug, Clone)]
1440pub enum AddedTransaction<T: PoolTransaction> {
1441 Pending(AddedPendingTransaction<T>),
1443 Parked {
1446 transaction: Arc<ValidPoolTransaction<T>>,
1448 replaced: Option<Arc<ValidPoolTransaction<T>>>,
1450 subpool: SubPool,
1452 queued_reason: Option<QueuedReason>,
1454 },
1455}
1456
1457impl<T: PoolTransaction> AddedTransaction<T> {
1458 pub const fn as_pending(&self) -> Option<&AddedPendingTransaction<T>> {
1460 match self {
1461 Self::Pending(tx) => Some(tx),
1462 _ => None,
1463 }
1464 }
1465
1466 pub const fn replaced(&self) -> Option<&Arc<ValidPoolTransaction<T>>> {
1468 match self {
1469 Self::Pending(tx) => tx.replaced.as_ref(),
1470 Self::Parked { replaced, .. } => replaced.as_ref(),
1471 }
1472 }
1473
1474 pub(crate) fn discarded_transactions(&self) -> Option<&[Arc<ValidPoolTransaction<T>>]> {
1476 match self {
1477 Self::Pending(tx) => Some(&tx.discarded),
1478 Self::Parked { .. } => None,
1479 }
1480 }
1481
1482 pub(crate) fn replaced_blob_transaction(&self) -> Option<B256> {
1484 self.replaced().filter(|tx| tx.transaction.is_eip4844()).map(|tx| *tx.transaction.hash())
1485 }
1486
1487 pub fn hash(&self) -> &TxHash {
1489 match self {
1490 Self::Pending(tx) => tx.transaction.hash(),
1491 Self::Parked { transaction, .. } => transaction.hash(),
1492 }
1493 }
1494
1495 pub fn into_new_transaction_event(self) -> NewTransactionEvent<T> {
1497 match self {
1498 Self::Pending(tx) => {
1499 NewTransactionEvent { subpool: SubPool::Pending, transaction: tx.transaction }
1500 }
1501 Self::Parked { transaction, subpool, .. } => {
1502 NewTransactionEvent { transaction, subpool }
1503 }
1504 }
1505 }
1506
1507 pub(crate) const fn subpool(&self) -> SubPool {
1509 match self {
1510 Self::Pending(_) => SubPool::Pending,
1511 Self::Parked { subpool, .. } => *subpool,
1512 }
1513 }
1514
1515 #[cfg(test)]
1517 pub(crate) fn id(&self) -> &TransactionId {
1518 match self {
1519 Self::Pending(added) => added.transaction.id(),
1520 Self::Parked { transaction, .. } => transaction.id(),
1521 }
1522 }
1523
1524 pub const fn queued_reason(&self) -> Option<&QueuedReason> {
1526 match self {
1527 Self::Pending(_) => None,
1528 Self::Parked { queued_reason, .. } => queued_reason.as_ref(),
1529 }
1530 }
1531
1532 pub fn transaction_state(&self) -> AddedTransactionState {
1534 match self.subpool() {
1535 SubPool::Pending => AddedTransactionState::Pending,
1536 _ => {
1537 if let Some(reason) = self.queued_reason() {
1540 AddedTransactionState::Queued(reason.clone())
1541 } else {
1542 AddedTransactionState::Queued(QueuedReason::NonceGap)
1544 }
1545 }
1546 }
1547 }
1548}
1549
1550#[derive(Debug, Clone, PartialEq, Eq)]
1552pub enum QueuedReason {
1553 NonceGap,
1555 ParkedAncestors,
1557 InsufficientBalance,
1559 TooMuchGas,
1561 InsufficientBaseFee,
1563 InsufficientBlobFee,
1565}
1566
1567#[derive(Debug, Clone, PartialEq, Eq)]
1569pub enum AddedTransactionState {
1570 Pending,
1572 Queued(QueuedReason),
1574}
1575
1576impl AddedTransactionState {
1577 pub const fn is_queued(&self) -> bool {
1579 matches!(self, Self::Queued(_))
1580 }
1581
1582 pub const fn is_pending(&self) -> bool {
1584 matches!(self, Self::Pending)
1585 }
1586
1587 pub const fn queued_reason(&self) -> Option<&QueuedReason> {
1589 match self {
1590 Self::Queued(reason) => Some(reason),
1591 Self::Pending => None,
1592 }
1593 }
1594}
1595
1596#[derive(Debug, Clone, PartialEq, Eq)]
1598pub struct AddedTransactionOutcome {
1599 pub hash: TxHash,
1601 pub state: AddedTransactionState,
1603}
1604
1605impl AddedTransactionOutcome {
1606 pub const fn is_queued(&self) -> bool {
1608 self.state.is_queued()
1609 }
1610
1611 pub const fn is_pending(&self) -> bool {
1613 self.state.is_pending()
1614 }
1615}
1616
1617#[derive(Debug)]
1619pub(crate) struct OnNewCanonicalStateOutcome<T: PoolTransaction> {
1620 pub(crate) block_hash: B256,
1622 pub(crate) mined: Vec<TxHash>,
1624 pub(crate) promoted: Vec<Arc<ValidPoolTransaction<T>>>,
1626 pub(crate) discarded: Vec<Arc<ValidPoolTransaction<T>>>,
1628}
1629
1630impl<T: PoolTransaction> OnNewCanonicalStateOutcome<T> {
1631 pub(crate) fn pending_transactions(
1637 &self,
1638 kind: TransactionListenerKind,
1639 ) -> impl Iterator<Item = B256> + '_ {
1640 let iter = self.promoted.iter();
1641 PendingTransactionIter { kind, iter }
1642 }
1643
1644 pub(crate) fn full_pending_transactions(
1650 &self,
1651 kind: TransactionListenerKind,
1652 ) -> impl Iterator<Item = NewTransactionEvent<T>> + '_ {
1653 let iter = self.promoted.iter();
1654 FullPendingTransactionIter { kind, iter }
1655 }
1656}
1657
1658#[cfg(test)]
1659mod tests {
1660 use crate::{
1661 blobstore::{BlobStore, InMemoryBlobStore},
1662 identifier::SenderId,
1663 test_utils::{MockTransaction, TestPoolBuilder},
1664 validate::ValidTransaction,
1665 BlockInfo, PoolConfig, SubPoolLimit, TransactionOrigin, TransactionValidationOutcome, U256,
1666 };
1667 use alloy_eips::{eip4844::BlobTransactionSidecar, eip7594::BlobTransactionSidecarVariant};
1668 use alloy_primitives::Address;
1669 use std::{fs, path::PathBuf};
1670
1671 #[test]
1672 fn test_discard_blobs_on_blob_tx_eviction() {
1673 let blobs = {
1674 let json_content = fs::read_to_string(
1676 PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test_data/blob1.json"),
1677 )
1678 .expect("Failed to read the blob data file");
1679
1680 let json_value: serde_json::Value =
1682 serde_json::from_str(&json_content).expect("Failed to deserialize JSON");
1683
1684 vec![
1686 json_value
1688 .get("data")
1689 .unwrap()
1690 .as_str()
1691 .expect("Data is not a valid string")
1692 .to_string(),
1693 ]
1694 };
1695
1696 let sidecar = BlobTransactionSidecarVariant::Eip4844(
1698 BlobTransactionSidecar::try_from_blobs_hex(blobs).unwrap(),
1699 );
1700
1701 let blob_limit = SubPoolLimit::new(1000, usize::MAX);
1703
1704 let test_pool = &TestPoolBuilder::default()
1706 .with_config(PoolConfig { blob_limit, ..Default::default() })
1707 .pool;
1708
1709 test_pool
1711 .set_block_info(BlockInfo { pending_blob_fee: Some(10_000_000), ..Default::default() });
1712
1713 let blob_store = InMemoryBlobStore::default();
1715
1716 for n in 0..blob_limit.max_txs + 10 {
1718 let mut tx = MockTransaction::eip4844_with_sidecar(sidecar.clone());
1720
1721 tx.set_size(1844674407370951);
1723
1724 if n < blob_limit.max_txs {
1726 blob_store.insert(*tx.get_hash(), sidecar.clone()).unwrap();
1727 }
1728
1729 test_pool.add_transactions(
1731 TransactionOrigin::External,
1732 [TransactionValidationOutcome::Valid {
1733 balance: U256::from(1_000),
1734 state_nonce: 0,
1735 bytecode_hash: None,
1736 transaction: ValidTransaction::ValidWithSidecar {
1737 transaction: tx,
1738 sidecar: sidecar.clone(),
1739 },
1740 propagate: true,
1741 authorities: None,
1742 }],
1743 );
1744 }
1745
1746 assert_eq!(test_pool.size().blob, blob_limit.max_txs);
1748
1749 assert_eq!(test_pool.size().blob_size, 1844674407370951000);
1751
1752 assert_eq!(*test_pool.blob_store(), blob_store);
1754 }
1755
1756 #[test]
1757 fn test_auths_stored_in_identifiers() {
1758 let test_pool = &TestPoolBuilder::default().with_config(Default::default()).pool;
1760
1761 let auth = Address::new([1; 20]);
1762 let tx = MockTransaction::eip7702();
1763
1764 test_pool.add_transactions(
1765 TransactionOrigin::Local,
1766 [TransactionValidationOutcome::Valid {
1767 balance: U256::from(1_000),
1768 state_nonce: 0,
1769 bytecode_hash: None,
1770 transaction: ValidTransaction::Valid(tx),
1771 propagate: true,
1772 authorities: Some(vec![auth]),
1773 }],
1774 );
1775
1776 let identifiers = test_pool.identifiers.read();
1777 assert_eq!(identifiers.sender_id(&auth), Some(SenderId::from(1)));
1778 }
1779
1780 #[test]
1781 fn sender_queries_do_not_allocate_ids_for_unknown_addresses() {
1782 let test_pool = &TestPoolBuilder::default().with_config(Default::default()).pool;
1783 let sender = Address::new([9; 20]);
1784
1785 assert_eq!(test_pool.sender_id(&sender), None);
1786 assert!(test_pool.get_transactions_by_sender(sender).is_empty());
1787 assert!(test_pool.get_pending_transaction_by_sender_and_nonce(sender, 0).is_none());
1788 assert!(test_pool.get_queued_transactions_by_sender(sender).is_empty());
1789 assert!(test_pool.get_pending_transactions_by_sender(sender).is_empty());
1790 assert!(test_pool.get_highest_transaction_by_sender(sender).is_none());
1791 assert!(test_pool.get_highest_consecutive_transaction_by_sender(sender, 0).is_none());
1792 assert!(test_pool.remove_transactions_by_sender(sender).is_empty());
1793 assert_eq!(test_pool.sender_id(&sender), None);
1794 }
1795}