1use crate::{
69 blobstore::BlobStore,
70 error::{PoolError, PoolErrorKind, PoolResult},
71 identifier::{SenderId, SenderIdentifiers, TransactionId},
72 metrics::BlobStoreMetrics,
73 pool::{
74 listener::{
75 BlobTransactionSidecarListener, PendingTransactionHashListener, PoolEventBroadcast,
76 TransactionListener,
77 },
78 state::SubPool,
79 txpool::{SenderInfo, TxPool},
80 update::UpdateOutcome,
81 },
82 traits::{
83 AllPoolTransactions, BestTransactionsAttributes, BlockInfo, GetPooledTransactionLimit,
84 NewBlobSidecar, PoolSize, PoolTransaction, PropagatedTransactions, TransactionOrigin,
85 },
86 validate::{TransactionValidationOutcome, ValidPoolTransaction, ValidTransaction},
87 CanonicalStateUpdate, EthPoolTransaction, PoolConfig, TransactionOrdering,
88 TransactionValidator,
89};
90
91use alloy_primitives::{
92 map::{AddressSet, HashSet},
93 Address, TxHash, B256,
94};
95use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
96use reth_eth_wire_types::HandleMempoolData;
97use reth_execution_types::ChangedAccount;
98
99use alloy_eips::{eip7594::BlobTransactionSidecarVariant, Typed2718};
100use reth_primitives_traits::Recovered;
101use rustc_hash::FxHashMap;
102use std::{
103 fmt,
104 sync::{
105 atomic::{AtomicBool, Ordering},
106 Arc,
107 },
108 time::Instant,
109};
110use tokio::sync::mpsc;
111use tracing::{debug, trace, warn};
112mod events;
113pub use best::{BestTransactionFilter, BestTransactionsWithPrioritizedSenders};
114pub use blob::{blob_tx_priority, fee_delta, BlobOrd, BlobTransactions};
115pub use events::{FullTransactionEvent, NewTransactionEvent, TransactionEvent};
116pub use listener::{AllTransactionsEvents, TransactionEvents, TransactionListenerKind};
117pub use parked::{BasefeeOrd, ParkedOrd, ParkedPool, QueuedOrd};
118pub use pending::PendingPool;
119
120mod best;
121pub use best::BestTransactions;
122
123mod blob;
124pub mod listener;
125mod parked;
126pub mod pending;
127pub mod size;
128pub(crate) mod state;
129pub mod txpool;
130mod update;
131
132pub const PENDING_TX_LISTENER_BUFFER_SIZE: usize = 2048;
134pub const NEW_TX_LISTENER_BUFFER_SIZE: usize = 1024;
136
137const BLOB_SIDECAR_LISTENER_BUFFER_SIZE: usize = 512;
138
139pub struct PoolInner<V, T, S>
141where
142 T: TransactionOrdering,
143{
144 identifiers: RwLock<SenderIdentifiers>,
146 validator: V,
148 blob_store: S,
150 pool: RwLock<TxPool<T>>,
152 config: PoolConfig,
154 event_listener: RwLock<PoolEventBroadcast<T::Transaction>>,
156 has_event_listeners: AtomicBool,
158 pending_transaction_listener: RwLock<Vec<PendingTransactionHashListener>>,
160 transaction_listener: RwLock<Vec<TransactionListener<T::Transaction>>>,
162 blob_transaction_sidecar_listener: Mutex<Vec<BlobTransactionSidecarListener>>,
164 blob_store_metrics: BlobStoreMetrics,
166}
167
168impl<V, T, S> PoolInner<V, T, S>
171where
172 V: TransactionValidator,
173 T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
174 S: BlobStore,
175{
176 pub fn new(validator: V, ordering: T, blob_store: S, config: PoolConfig) -> Self {
178 Self {
179 identifiers: Default::default(),
180 validator,
181 event_listener: Default::default(),
182 has_event_listeners: AtomicBool::new(false),
183 pool: RwLock::new(TxPool::new(ordering, config.clone())),
184 pending_transaction_listener: Default::default(),
185 transaction_listener: Default::default(),
186 blob_transaction_sidecar_listener: Default::default(),
187 config,
188 blob_store,
189 blob_store_metrics: Default::default(),
190 }
191 }
192
193 pub const fn blob_store(&self) -> &S {
195 &self.blob_store
196 }
197
198 pub fn size(&self) -> PoolSize {
200 self.get_pool_data().size()
201 }
202
203 pub fn block_info(&self) -> BlockInfo {
205 self.get_pool_data().block_info()
206 }
207 pub fn set_block_info(&self, info: BlockInfo) {
212 let outcome = self.pool.write().set_block_info(info);
213
214 self.notify_on_transaction_updates(outcome.promoted, outcome.discarded);
216 }
217
218 pub fn get_sender_id(&self, addr: Address) -> SenderId {
225 self.identifiers.write().sender_id_or_create(addr)
226 }
227
228 pub fn sender_id(&self, addr: &Address) -> Option<SenderId> {
233 self.identifiers.read().sender_id(addr)
234 }
235
236 pub fn get_sender_ids(&self, addrs: impl IntoIterator<Item = Address>) -> Vec<SenderId> {
238 self.identifiers.write().sender_ids_or_create(addrs)
239 }
240
241 pub fn unique_senders(&self) -> AddressSet {
243 self.get_pool_data().unique_senders()
244 }
245
246 fn changed_senders(
249 &self,
250 accs: impl Iterator<Item = ChangedAccount>,
251 ) -> FxHashMap<SenderId, SenderInfo> {
252 let identifiers = self.identifiers.read();
253 accs.into_iter()
254 .filter_map(|acc| {
255 let ChangedAccount { address, nonce, balance } = acc;
256 let sender_id = identifiers.sender_id(&address)?;
257 Some((sender_id, SenderInfo { state_nonce: nonce, balance }))
258 })
259 .collect()
260 }
261
262 pub const fn config(&self) -> &PoolConfig {
264 &self.config
265 }
266
267 pub const fn validator(&self) -> &V {
269 &self.validator
270 }
271
272 pub fn add_pending_listener(&self, kind: TransactionListenerKind) -> mpsc::Receiver<TxHash> {
275 let (sender, rx) = mpsc::channel(self.config.pending_tx_listener_buffer_size);
276 let listener = PendingTransactionHashListener { sender, kind };
277
278 let mut listeners = self.pending_transaction_listener.write();
279 listeners.retain(|l| !l.sender.is_closed());
281 listeners.push(listener);
282
283 rx
284 }
285
286 pub fn add_new_transaction_listener(
288 &self,
289 kind: TransactionListenerKind,
290 ) -> mpsc::Receiver<NewTransactionEvent<T::Transaction>> {
291 let (sender, rx) = mpsc::channel(self.config.new_tx_listener_buffer_size);
292 let listener = TransactionListener { sender, kind };
293
294 let mut listeners = self.transaction_listener.write();
295 listeners.retain(|l| !l.sender.is_closed());
297 listeners.push(listener);
298
299 rx
300 }
301 pub fn add_blob_sidecar_listener(&self) -> mpsc::Receiver<NewBlobSidecar> {
304 let (sender, rx) = mpsc::channel(BLOB_SIDECAR_LISTENER_BUFFER_SIZE);
305 let listener = BlobTransactionSidecarListener { sender };
306 self.blob_transaction_sidecar_listener.lock().push(listener);
307 rx
308 }
309
310 pub fn add_transaction_event_listener(&self, tx_hash: TxHash) -> Option<TransactionEvents> {
313 if !self.get_pool_data().contains(&tx_hash) {
314 return None
315 }
316 let mut listener = self.event_listener.write();
317 let events = listener.subscribe(tx_hash);
318 self.mark_event_listener_installed();
319 Some(events)
320 }
321
322 pub fn add_all_transactions_event_listener(&self) -> AllTransactionsEvents<T::Transaction> {
324 let mut listener = self.event_listener.write();
325 let events = listener.subscribe_all();
326 self.mark_event_listener_installed();
327 events
328 }
329
330 #[inline]
331 fn has_event_listeners(&self) -> bool {
332 self.has_event_listeners.load(Ordering::Relaxed)
333 }
334
335 #[inline]
336 fn mark_event_listener_installed(&self) {
337 self.has_event_listeners.store(true, Ordering::Relaxed);
338 }
339
340 #[inline]
341 fn update_event_listener_state(&self, listener: &PoolEventBroadcast<T::Transaction>) {
342 if listener.is_empty() {
343 self.has_event_listeners.store(false, Ordering::Relaxed);
344 }
345 }
346
347 #[inline]
348 fn with_event_listener<F>(&self, emit: F)
349 where
350 F: FnOnce(&mut PoolEventBroadcast<T::Transaction>),
351 {
352 if !self.has_event_listeners() {
353 return
354 }
355 let mut listener = self.event_listener.write();
356 if !listener.is_empty() {
357 emit(&mut listener);
358 }
359 self.update_event_listener_state(&listener);
360 }
361
362 pub fn get_pool_data(&self) -> RwLockReadGuard<'_, TxPool<T>> {
364 self.pool.read()
365 }
366
367 pub fn pooled_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
369 let mut out = Vec::new();
370 self.append_pooled_transactions(&mut out);
371 out
372 }
373
374 pub fn pooled_transactions_hashes(&self) -> Vec<TxHash> {
376 let mut out = Vec::new();
377 self.append_pooled_transactions_hashes(&mut out);
378 out
379 }
380
381 pub fn pooled_transactions_max(
383 &self,
384 max: usize,
385 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
386 let mut out = Vec::new();
387 self.append_pooled_transactions_max(max, &mut out);
388 out
389 }
390
391 pub fn append_pooled_transactions(
393 &self,
394 out: &mut Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
395 ) {
396 out.extend(
397 self.get_pool_data().all().transactions_iter().filter(|tx| tx.propagate).cloned(),
398 );
399 }
400
401 pub fn append_pooled_transaction_elements(
404 &self,
405 tx_hashes: &[TxHash],
406 limit: GetPooledTransactionLimit,
407 out: &mut Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>,
408 ) where
409 <V as TransactionValidator>::Transaction: EthPoolTransaction,
410 {
411 let transactions = self.get_all_propagatable(tx_hashes);
412 let mut size = 0;
413 for transaction in transactions {
414 let encoded_len = transaction.encoded_length();
415 let Some(pooled) = self.to_pooled_transaction(transaction) else {
416 continue;
417 };
418
419 size += encoded_len;
420 out.push(pooled.into_inner());
421
422 if limit.exceeds(size) {
423 break
424 }
425 }
426 }
427
428 pub fn append_pooled_transactions_hashes(&self, out: &mut Vec<TxHash>) {
431 out.extend(
432 self.get_pool_data()
433 .all()
434 .transactions_iter()
435 .filter(|tx| tx.propagate)
436 .map(|tx| *tx.hash()),
437 );
438 }
439
440 pub fn append_pooled_transactions_max(
443 &self,
444 max: usize,
445 out: &mut Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
446 ) {
447 out.extend(
448 self.get_pool_data()
449 .all()
450 .transactions_iter()
451 .filter(|tx| tx.propagate)
452 .take(max)
453 .cloned(),
454 );
455 }
456
457 pub fn pooled_transactions_hashes_max(&self, max: usize) -> Vec<TxHash> {
459 if max == 0 {
460 return Vec::new();
461 }
462 self.get_pool_data()
463 .all()
464 .transactions_iter()
465 .filter(|tx| tx.propagate)
466 .take(max)
467 .map(|tx| *tx.hash())
468 .collect()
469 }
470
471 fn to_pooled_transaction(
476 &self,
477 transaction: Arc<ValidPoolTransaction<T::Transaction>>,
478 ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
479 where
480 <V as TransactionValidator>::Transaction: EthPoolTransaction,
481 {
482 if transaction.is_eip4844() {
483 let sidecar = self.blob_store.get(*transaction.hash()).ok()??;
484 transaction.transaction.clone().try_into_pooled_eip4844(sidecar)
485 } else {
486 transaction
487 .transaction
488 .clone_into_pooled()
489 .inspect_err(|err| {
490 debug!(
491 target: "txpool", %err,
492 "failed to convert transaction to pooled element; skipping",
493 );
494 })
495 .ok()
496 }
497 }
498
499 pub fn get_pooled_transaction_elements(
502 &self,
503 tx_hashes: Vec<TxHash>,
504 limit: GetPooledTransactionLimit,
505 ) -> Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>
506 where
507 <V as TransactionValidator>::Transaction: EthPoolTransaction,
508 {
509 let mut elements = Vec::new();
510 self.append_pooled_transaction_elements(&tx_hashes, limit, &mut elements);
511 elements.shrink_to_fit();
512 elements
513 }
514
515 pub fn get_pooled_transaction_element(
517 &self,
518 tx_hash: TxHash,
519 ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
520 where
521 <V as TransactionValidator>::Transaction: EthPoolTransaction,
522 {
523 self.get(&tx_hash).and_then(|tx| self.to_pooled_transaction(tx))
524 }
525
526 pub fn on_canonical_state_change(&self, update: CanonicalStateUpdate<'_, V::Block>) {
528 trace!(target: "txpool", ?update, "updating pool on canonical state change");
529
530 let block_info = update.block_info();
531 let CanonicalStateUpdate {
532 new_tip, changed_accounts, mined_transactions, update_kind, ..
533 } = update;
534 self.validator.on_new_head_block(new_tip);
535
536 let changed_senders = self.changed_senders(changed_accounts.into_iter());
537
538 let outcome = self.pool.write().on_canonical_state_change(
540 block_info,
541 mined_transactions,
542 changed_senders,
543 update_kind,
544 );
545
546 self.delete_discarded_blobs(outcome.discarded.iter());
548
549 self.notify_on_new_state(outcome);
551 }
552
553 pub fn update_accounts(&self, accounts: Vec<ChangedAccount>) {
559 let changed_senders = self.changed_senders(accounts.into_iter());
560 let UpdateOutcome { promoted, discarded } =
561 self.pool.write().update_accounts(changed_senders);
562
563 self.notify_on_transaction_updates(promoted, discarded);
564 }
565
566 fn add_transaction(
574 &self,
575 pool: &mut RwLockWriteGuard<'_, TxPool<T>>,
576 origin: TransactionOrigin,
577 tx: TransactionValidationOutcome<T::Transaction>,
578 ) -> (PoolResult<AddedTransactionOutcome>, Option<AddedTransactionMeta<T::Transaction>>) {
579 match tx {
580 TransactionValidationOutcome::Valid {
581 balance,
582 state_nonce,
583 transaction,
584 propagate,
585 bytecode_hash,
586 authorities,
587 } => {
588 let sender_id = self.get_sender_id(transaction.sender());
589 let transaction_id = TransactionId::new(sender_id, transaction.nonce());
590
591 let (transaction, blob_sidecar) = match transaction {
593 ValidTransaction::Valid(tx) => (tx, None),
594 ValidTransaction::ValidWithSidecar { transaction, sidecar } => {
595 debug_assert!(
596 transaction.is_eip4844(),
597 "validator returned sidecar for non EIP-4844 transaction"
598 );
599 (transaction, Some(sidecar))
600 }
601 };
602
603 let tx = ValidPoolTransaction {
604 transaction,
605 transaction_id,
606 propagate,
607 timestamp: Instant::now(),
608 origin,
609 authority_ids: authorities.map(|auths| self.get_sender_ids(auths)),
610 };
611
612 let added = match pool.add_transaction(tx, balance, state_nonce, bytecode_hash) {
613 Ok(added) => added,
614 Err(err) => return (Err(err), None),
615 };
616 let hash = *added.hash();
617 let state = added.transaction_state();
618
619 let meta = AddedTransactionMeta { added, blob_sidecar };
620
621 (Ok(AddedTransactionOutcome { hash, state }), Some(meta))
622 }
623 TransactionValidationOutcome::Invalid(tx, err) => {
624 self.with_event_listener(|listener| listener.invalid(tx.hash()));
625 (Err(PoolError::new(*tx.hash(), err)), None)
626 }
627 TransactionValidationOutcome::Error(tx_hash, err) => {
628 self.with_event_listener(|listener| listener.discarded(&tx_hash));
629 (Err(PoolError::other(tx_hash, err)), None)
630 }
631 }
632 }
633
634 pub fn add_transaction_and_subscribe(
636 &self,
637 origin: TransactionOrigin,
638 tx: TransactionValidationOutcome<T::Transaction>,
639 ) -> PoolResult<TransactionEvents> {
640 let listener = {
641 let mut listener = self.event_listener.write();
642 let events = listener.subscribe(tx.tx_hash());
643 self.mark_event_listener_installed();
644 events
645 };
646 let mut results = self.add_transactions(origin, std::iter::once(tx));
647 results.pop().expect("result length is the same as the input")?;
648 Ok(listener)
649 }
650
651 pub fn add_transactions(
656 &self,
657 origin: TransactionOrigin,
658 transactions: impl IntoIterator<Item = TransactionValidationOutcome<T::Transaction>>,
659 ) -> Vec<PoolResult<AddedTransactionOutcome>> {
660 self.add_transactions_with_origins(transactions.into_iter().map(|tx| (origin, tx)))
661 }
662
663 pub fn add_transactions_with_origins(
666 &self,
667 transactions: impl IntoIterator<
668 Item = (TransactionOrigin, TransactionValidationOutcome<T::Transaction>),
669 >,
670 ) -> Vec<PoolResult<AddedTransactionOutcome>> {
671 let (mut results, added_metas, discarded) = {
673 let mut pool = self.pool.write();
674 let mut added_metas = Vec::new();
675
676 let results = transactions
677 .into_iter()
678 .map(|(origin, tx)| {
679 let (result, meta) = self.add_transaction(&mut pool, origin, tx);
680
681 if result.is_ok() &&
683 let Some(meta) = meta
684 {
685 added_metas.push(meta);
686 }
687
688 result
689 })
690 .collect::<Vec<_>>();
691
692 let discarded = if results.iter().any(Result::is_ok) {
694 pool.discard_worst()
695 } else {
696 Default::default()
697 };
698
699 (results, added_metas, discarded)
700 };
701
702 for meta in added_metas {
703 self.on_added_transaction(meta);
704 }
705
706 if !discarded.is_empty() {
707 self.delete_discarded_blobs(discarded.iter());
709 self.with_event_listener(|listener| listener.discarded_many(&discarded));
710
711 let discarded_hashes =
712 discarded.into_iter().map(|tx| *tx.hash()).collect::<HashSet<_>>();
713
714 for res in &mut results {
717 if let Ok(AddedTransactionOutcome { hash, .. }) = res &&
718 discarded_hashes.contains(hash)
719 {
720 *res = Err(PoolError::new(*hash, PoolErrorKind::DiscardedOnInsert))
721 }
722 }
723 };
724
725 results
726 }
727
728 fn on_added_transaction(&self, meta: AddedTransactionMeta<T::Transaction>) {
733 if let Some(sidecar) = meta.blob_sidecar {
735 let hash = *meta.added.hash();
736 self.on_new_blob_sidecar(&hash, &sidecar);
737 self.insert_blob(hash, sidecar);
738 }
739
740 if let Some(replaced) = meta.added.replaced_blob_transaction() {
742 debug!(target: "txpool", "[{:?}] delete replaced blob sidecar", replaced);
743 self.delete_blob(replaced);
744 }
745
746 if let Some(discarded) = meta.added.discarded_transactions() {
748 self.delete_discarded_blobs(discarded.iter());
749 }
750
751 if let Some(pending) = meta.added.as_pending() {
753 self.on_new_pending_transaction(pending);
754 }
755
756 self.notify_event_listeners(&meta.added);
758
759 self.on_new_transaction(meta.added.into_new_transaction_event());
761 }
762
763 pub fn on_new_pending_transaction(&self, pending: &AddedPendingTransaction<T::Transaction>) {
772 let mut needs_cleanup = false;
773
774 {
775 let listeners = self.pending_transaction_listener.read();
776 for listener in listeners.iter() {
777 if !listener.send_all(pending.pending_transactions(listener.kind)) {
778 needs_cleanup = true;
779 }
780 }
781 }
782
783 if needs_cleanup {
785 self.pending_transaction_listener
786 .write()
787 .retain(|listener| !listener.sender.is_closed());
788 }
789 }
790
791 pub fn on_new_transaction(&self, event: NewTransactionEvent<T::Transaction>) {
800 let mut needs_cleanup = false;
801
802 {
803 let listeners = self.transaction_listener.read();
804 for listener in listeners.iter() {
805 if listener.kind.is_propagate_only() && !event.transaction.propagate {
806 if listener.sender.is_closed() {
807 needs_cleanup = true;
808 }
809 continue
811 }
812
813 if !listener.send(event.clone()) {
814 needs_cleanup = true;
815 }
816 }
817 }
818
819 if needs_cleanup {
821 self.transaction_listener.write().retain(|listener| !listener.sender.is_closed());
822 }
823 }
824
825 fn on_new_blob_sidecar(&self, tx_hash: &TxHash, sidecar: &BlobTransactionSidecarVariant) {
827 let mut sidecar_listeners = self.blob_transaction_sidecar_listener.lock();
828 if sidecar_listeners.is_empty() {
829 return
830 }
831 let sidecar = Arc::new(sidecar.clone());
832 sidecar_listeners.retain_mut(|listener| {
833 let new_blob_event = NewBlobSidecar { tx_hash: *tx_hash, sidecar: sidecar.clone() };
834 match listener.sender.try_send(new_blob_event) {
835 Ok(()) => true,
836 Err(err) => {
837 if matches!(err, mpsc::error::TrySendError::Full(_)) {
838 debug!(
839 target: "txpool",
840 "[{:?}] failed to send blob sidecar; channel full",
841 sidecar,
842 );
843 true
844 } else {
845 false
846 }
847 }
848 }
849 })
850 }
851
852 fn notify_on_new_state(&self, outcome: OnNewCanonicalStateOutcome<T::Transaction>) {
854 trace!(target: "txpool", promoted=outcome.promoted.len(), discarded= outcome.discarded.len() ,"notifying listeners on state change");
855
856 let mut needs_pending_cleanup = false;
858 {
859 let listeners = self.pending_transaction_listener.read();
860 for listener in listeners.iter() {
861 if !listener.send_all(outcome.pending_transactions(listener.kind)) {
862 needs_pending_cleanup = true;
863 }
864 }
865 }
866 if needs_pending_cleanup {
867 self.pending_transaction_listener.write().retain(|l| !l.sender.is_closed());
868 }
869
870 let mut needs_tx_cleanup = false;
872 {
873 let listeners = self.transaction_listener.read();
874 for listener in listeners.iter() {
875 if !listener.send_all(outcome.full_pending_transactions(listener.kind)) {
876 needs_tx_cleanup = true;
877 }
878 }
879 }
880 if needs_tx_cleanup {
881 self.transaction_listener.write().retain(|l| !l.sender.is_closed());
882 }
883
884 let OnNewCanonicalStateOutcome { mined, promoted, discarded, block_hash } = outcome;
885
886 self.with_event_listener(|listener| {
888 for tx in &mined {
889 listener.mined(tx, block_hash);
890 }
891 for tx in &promoted {
892 listener.pending(tx.hash(), None);
893 }
894 for tx in &discarded {
895 listener.discarded(tx.hash());
896 }
897 })
898 }
899
900 pub fn notify_on_transaction_updates(
909 &self,
910 promoted: Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
911 discarded: Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
912 ) {
913 if !promoted.is_empty() {
915 let mut needs_pending_cleanup = false;
916 {
917 let listeners = self.pending_transaction_listener.read();
918 for listener in listeners.iter() {
919 let promoted_hashes = promoted.iter().filter_map(|tx| {
920 if listener.kind.is_propagate_only() && !tx.propagate {
921 None
922 } else {
923 Some(*tx.hash())
924 }
925 });
926 if !listener.send_all(promoted_hashes) {
927 needs_pending_cleanup = true;
928 }
929 }
930 }
931 if needs_pending_cleanup {
932 self.pending_transaction_listener.write().retain(|l| !l.sender.is_closed());
933 }
934
935 let mut needs_tx_cleanup = false;
937 {
938 let listeners = self.transaction_listener.read();
939 for listener in listeners.iter() {
940 let promoted_txs = promoted.iter().filter_map(|tx| {
941 if listener.kind.is_propagate_only() && !tx.propagate {
942 None
943 } else {
944 Some(NewTransactionEvent::pending(tx.clone()))
945 }
946 });
947 if !listener.send_all(promoted_txs) {
948 needs_tx_cleanup = true;
949 }
950 }
951 }
952 if needs_tx_cleanup {
953 self.transaction_listener.write().retain(|l| !l.sender.is_closed());
954 }
955 }
956
957 self.with_event_listener(|listener| {
958 for tx in &promoted {
959 listener.pending(tx.hash(), None);
960 }
961 for tx in &discarded {
962 listener.discarded(tx.hash());
963 }
964 });
965
966 if !discarded.is_empty() {
967 self.delete_discarded_blobs(discarded.iter());
970 }
971 }
972
973 pub fn notify_event_listeners(&self, tx: &AddedTransaction<T::Transaction>) {
982 self.with_event_listener(|listener| match tx {
983 AddedTransaction::Pending(tx) => {
984 let AddedPendingTransaction { transaction, promoted, discarded, replaced } = tx;
985
986 listener.pending(transaction.hash(), replaced.clone());
987 for tx in promoted {
988 listener.pending(tx.hash(), None);
989 }
990 for tx in discarded {
991 listener.discarded(tx.hash());
992 }
993 }
994 AddedTransaction::Parked { transaction, replaced, queued_reason, .. } => {
995 listener.queued(transaction.hash(), queued_reason.clone());
996 if let Some(replaced) = replaced {
997 listener.replaced(replaced.clone(), *transaction.hash());
998 }
999 }
1000 });
1001 }
1002
1003 pub fn best_transactions(&self) -> BestTransactions<T> {
1005 self.get_pool_data().best_transactions()
1006 }
1007
1008 pub fn best_transactions_with_attributes(
1011 &self,
1012 best_transactions_attributes: BestTransactionsAttributes,
1013 ) -> Box<dyn crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T::Transaction>>>>
1014 {
1015 self.get_pool_data().best_transactions_with_attributes(best_transactions_attributes)
1016 }
1017
1018 pub fn pending_transactions_max(
1020 &self,
1021 max: usize,
1022 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1023 self.get_pool_data().pending_transactions_iter().take(max).collect()
1024 }
1025
1026 pub fn pending_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1028 self.get_pool_data().pending_transactions()
1029 }
1030
1031 pub fn queued_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1033 self.get_pool_data().queued_transactions()
1034 }
1035
1036 pub fn all_transactions(&self) -> AllPoolTransactions<T::Transaction> {
1038 let pool = self.get_pool_data();
1039 AllPoolTransactions {
1040 pending: pool.pending_transactions(),
1041 queued: pool.queued_transactions(),
1042 }
1043 }
1044
1045 pub fn all_transaction_hashes(&self) -> Vec<TxHash> {
1047 self.get_pool_data().all().transactions_iter().map(|tx| *tx.hash()).collect()
1048 }
1049
1050 pub fn remove_transactions(
1055 &self,
1056 hashes: Vec<TxHash>,
1057 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1058 if hashes.is_empty() {
1059 return Vec::new()
1060 }
1061 let removed = self.pool.write().remove_transactions(hashes);
1062
1063 self.with_event_listener(|listener| listener.discarded_many(&removed));
1064
1065 removed
1066 }
1067
1068 pub fn remove_transactions_and_descendants(
1071 &self,
1072 hashes: Vec<TxHash>,
1073 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1074 if hashes.is_empty() {
1075 return Vec::new()
1076 }
1077 let removed = self.pool.write().remove_transactions_and_descendants(hashes);
1078
1079 self.with_event_listener(|listener| {
1080 for tx in &removed {
1081 listener.discarded(tx.hash());
1082 }
1083 });
1084
1085 removed
1086 }
1087
1088 pub fn remove_transactions_by_sender(
1090 &self,
1091 sender: Address,
1092 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1093 let Some(sender_id) = self.sender_id(&sender) else { return Vec::new() };
1094 let removed = self.pool.write().remove_transactions_by_sender(sender_id);
1095
1096 self.with_event_listener(|listener| listener.discarded_many(&removed));
1097
1098 removed
1099 }
1100
1101 pub fn prune_transactions(
1106 &self,
1107 hashes: Vec<TxHash>,
1108 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1109 if hashes.is_empty() {
1110 return Vec::new()
1111 }
1112
1113 self.pool.write().prune_transactions(hashes)
1114 }
1115
1116 pub fn retain_unknown<A>(&self, announcement: &mut A)
1118 where
1119 A: HandleMempoolData,
1120 {
1121 if announcement.is_empty() {
1122 return
1123 }
1124 let pool = self.get_pool_data();
1125 announcement.retain_by_hash(|tx| !pool.contains(tx))
1126 }
1127
1128 pub fn get(&self, tx_hash: &TxHash) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1130 self.get_pool_data().get(tx_hash)
1131 }
1132
1133 pub fn get_transactions_by_sender(
1135 &self,
1136 sender: Address,
1137 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1138 let Some(sender_id) = self.sender_id(&sender) else { return Vec::new() };
1139 self.get_pool_data().get_transactions_by_sender(sender_id)
1140 }
1141
1142 pub fn get_pending_transaction_by_sender_and_nonce(
1144 &self,
1145 sender: Address,
1146 nonce: u64,
1147 ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1148 let sender_id = self.sender_id(&sender)?;
1149 self.get_pool_data().get_pending_transaction_by_sender_and_nonce(sender_id, nonce)
1150 }
1151
1152 pub fn get_queued_transactions_by_sender(
1154 &self,
1155 sender: Address,
1156 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1157 let Some(sender_id) = self.sender_id(&sender) else { return Vec::new() };
1158 self.get_pool_data().queued_txs_by_sender(sender_id)
1159 }
1160
1161 pub fn pending_transactions_with_predicate(
1163 &self,
1164 predicate: impl FnMut(&ValidPoolTransaction<T::Transaction>) -> bool,
1165 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1166 self.get_pool_data().pending_transactions_with_predicate(predicate)
1167 }
1168
1169 pub fn get_pending_transactions_by_sender(
1171 &self,
1172 sender: Address,
1173 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1174 let Some(sender_id) = self.sender_id(&sender) else { return Vec::new() };
1175 self.get_pool_data().pending_txs_by_sender(sender_id)
1176 }
1177
1178 pub fn get_highest_transaction_by_sender(
1180 &self,
1181 sender: Address,
1182 ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1183 let sender_id = self.sender_id(&sender)?;
1184 self.get_pool_data().get_highest_transaction_by_sender(sender_id)
1185 }
1186
1187 pub fn get_highest_consecutive_transaction_by_sender(
1189 &self,
1190 sender: Address,
1191 on_chain_nonce: u64,
1192 ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1193 let sender_id = self.sender_id(&sender)?;
1194 self.get_pool_data().get_highest_consecutive_transaction_by_sender(
1195 sender_id.into_transaction_id(on_chain_nonce),
1196 )
1197 }
1198
1199 pub fn get_transaction_by_transaction_id(
1201 &self,
1202 transaction_id: &TransactionId,
1203 ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1204 self.get_pool_data().all().get(transaction_id).map(|tx| tx.transaction.clone())
1205 }
1206
1207 pub fn get_transactions_by_origin(
1209 &self,
1210 origin: TransactionOrigin,
1211 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1212 self.get_pool_data()
1213 .all()
1214 .transactions_iter()
1215 .filter(|tx| tx.origin == origin)
1216 .cloned()
1217 .collect()
1218 }
1219
1220 pub fn get_pending_transactions_by_origin(
1222 &self,
1223 origin: TransactionOrigin,
1224 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1225 self.get_pool_data().pending_transactions_iter().filter(|tx| tx.origin == origin).collect()
1226 }
1227
1228 pub fn get_all(&self, txs: Vec<TxHash>) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1232 if txs.is_empty() {
1233 return Vec::new()
1234 }
1235 self.get_pool_data().get_all(txs).collect()
1236 }
1237
1238 fn get_all_propagatable(
1242 &self,
1243 txs: &[TxHash],
1244 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1245 if txs.is_empty() {
1246 return Vec::new()
1247 }
1248 let pool = self.get_pool_data();
1249 txs.iter().filter_map(|tx| pool.get(tx).filter(|tx| tx.propagate)).collect()
1250 }
1251
1252 pub fn on_propagated(&self, txs: PropagatedTransactions) {
1254 if txs.is_empty() {
1255 return
1256 }
1257 self.with_event_listener(|listener| {
1258 txs.into_iter().for_each(|(hash, peers)| listener.propagated(&hash, peers));
1259 });
1260 }
1261
1262 pub fn len(&self) -> usize {
1264 self.get_pool_data().len()
1265 }
1266
1267 pub fn is_empty(&self) -> bool {
1269 self.get_pool_data().is_empty()
1270 }
1271
1272 pub fn is_exceeded(&self) -> bool {
1274 self.pool.read().is_exceeded()
1275 }
1276
1277 fn insert_blob(&self, hash: TxHash, blob: BlobTransactionSidecarVariant) {
1279 debug!(target: "txpool", "[{:?}] storing blob sidecar", hash);
1280 if let Err(err) = self.blob_store.insert(hash, blob) {
1281 warn!(target: "txpool", %err, "[{:?}] failed to insert blob", hash);
1282 self.blob_store_metrics.blobstore_failed_inserts.increment(1);
1283 }
1284 self.update_blob_store_metrics();
1285 }
1286
1287 pub fn delete_blob(&self, blob: TxHash) {
1289 let _ = self.blob_store.delete(blob);
1290 }
1291
1292 pub fn delete_blobs(&self, txs: Vec<TxHash>) {
1294 let _ = self.blob_store.delete_all(txs);
1295 }
1296
1297 pub fn cleanup_blobs(&self) {
1299 let stat = self.blob_store.cleanup();
1300 self.blob_store_metrics.blobstore_failed_deletes.increment(stat.delete_failed as u64);
1301 self.update_blob_store_metrics();
1302 }
1303
1304 fn update_blob_store_metrics(&self) {
1305 if let Some(data_size) = self.blob_store.data_size_hint() {
1306 self.blob_store_metrics.blobstore_byte_size.set(data_size as f64);
1307 }
1308 self.blob_store_metrics.blobstore_entries.set(self.blob_store.blobs_len() as f64);
1309 }
1310
1311 fn delete_discarded_blobs<'a>(
1313 &'a self,
1314 transactions: impl IntoIterator<Item = &'a Arc<ValidPoolTransaction<T::Transaction>>>,
1315 ) {
1316 let blob_txs = transactions
1317 .into_iter()
1318 .filter(|tx| tx.transaction.is_eip4844())
1319 .map(|tx| *tx.hash())
1320 .collect();
1321 self.delete_blobs(blob_txs);
1322 }
1323}
1324
1325impl<V, T: TransactionOrdering, S> fmt::Debug for PoolInner<V, T, S> {
1326 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1327 f.debug_struct("PoolInner").field("config", &self.config).finish_non_exhaustive()
1328 }
1329}
1330
1331#[derive(Debug)]
1336struct AddedTransactionMeta<T: PoolTransaction> {
1337 added: AddedTransaction<T>,
1339 blob_sidecar: Option<BlobTransactionSidecarVariant>,
1341}
1342
1343#[derive(Debug, Clone)]
1345pub struct AddedPendingTransaction<T: PoolTransaction> {
1346 pub transaction: Arc<ValidPoolTransaction<T>>,
1348 pub replaced: Option<Arc<ValidPoolTransaction<T>>>,
1350 pub promoted: Vec<Arc<ValidPoolTransaction<T>>>,
1352 pub discarded: Vec<Arc<ValidPoolTransaction<T>>>,
1354}
1355
1356impl<T: PoolTransaction> AddedPendingTransaction<T> {
1357 pub(crate) fn pending_transactions(
1363 &self,
1364 kind: TransactionListenerKind,
1365 ) -> impl Iterator<Item = B256> + '_ {
1366 let iter = std::iter::once(&self.transaction).chain(self.promoted.iter());
1367 PendingTransactionIter { kind, iter }
1368 }
1369}
1370
1371pub(crate) struct PendingTransactionIter<Iter> {
1372 kind: TransactionListenerKind,
1373 iter: Iter,
1374}
1375
1376impl<'a, Iter, T> Iterator for PendingTransactionIter<Iter>
1377where
1378 Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1379 T: PoolTransaction + 'a,
1380{
1381 type Item = B256;
1382
1383 fn next(&mut self) -> Option<Self::Item> {
1384 loop {
1385 let next = self.iter.next()?;
1386 if self.kind.is_propagate_only() && !next.propagate {
1387 continue
1388 }
1389 return Some(*next.hash())
1390 }
1391 }
1392}
1393
1394pub(crate) struct FullPendingTransactionIter<Iter> {
1396 kind: TransactionListenerKind,
1397 iter: Iter,
1398}
1399
1400impl<'a, Iter, T> Iterator for FullPendingTransactionIter<Iter>
1401where
1402 Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1403 T: PoolTransaction + 'a,
1404{
1405 type Item = NewTransactionEvent<T>;
1406
1407 fn next(&mut self) -> Option<Self::Item> {
1408 loop {
1409 let next = self.iter.next()?;
1410 if self.kind.is_propagate_only() && !next.propagate {
1411 continue
1412 }
1413 return Some(NewTransactionEvent {
1414 subpool: SubPool::Pending,
1415 transaction: next.clone(),
1416 })
1417 }
1418 }
1419}
1420
1421#[derive(Debug, Clone)]
1423pub enum AddedTransaction<T: PoolTransaction> {
1424 Pending(AddedPendingTransaction<T>),
1426 Parked {
1429 transaction: Arc<ValidPoolTransaction<T>>,
1431 replaced: Option<Arc<ValidPoolTransaction<T>>>,
1433 subpool: SubPool,
1435 queued_reason: Option<QueuedReason>,
1437 },
1438}
1439
1440impl<T: PoolTransaction> AddedTransaction<T> {
1441 pub const fn as_pending(&self) -> Option<&AddedPendingTransaction<T>> {
1443 match self {
1444 Self::Pending(tx) => Some(tx),
1445 _ => None,
1446 }
1447 }
1448
1449 pub const fn replaced(&self) -> Option<&Arc<ValidPoolTransaction<T>>> {
1451 match self {
1452 Self::Pending(tx) => tx.replaced.as_ref(),
1453 Self::Parked { replaced, .. } => replaced.as_ref(),
1454 }
1455 }
1456
1457 pub(crate) fn discarded_transactions(&self) -> Option<&[Arc<ValidPoolTransaction<T>>]> {
1459 match self {
1460 Self::Pending(tx) => Some(&tx.discarded),
1461 Self::Parked { .. } => None,
1462 }
1463 }
1464
1465 pub(crate) fn replaced_blob_transaction(&self) -> Option<B256> {
1467 self.replaced().filter(|tx| tx.transaction.is_eip4844()).map(|tx| *tx.transaction.hash())
1468 }
1469
1470 pub fn hash(&self) -> &TxHash {
1472 match self {
1473 Self::Pending(tx) => tx.transaction.hash(),
1474 Self::Parked { transaction, .. } => transaction.hash(),
1475 }
1476 }
1477
1478 pub fn into_new_transaction_event(self) -> NewTransactionEvent<T> {
1480 match self {
1481 Self::Pending(tx) => {
1482 NewTransactionEvent { subpool: SubPool::Pending, transaction: tx.transaction }
1483 }
1484 Self::Parked { transaction, subpool, .. } => {
1485 NewTransactionEvent { transaction, subpool }
1486 }
1487 }
1488 }
1489
1490 pub(crate) const fn subpool(&self) -> SubPool {
1492 match self {
1493 Self::Pending(_) => SubPool::Pending,
1494 Self::Parked { subpool, .. } => *subpool,
1495 }
1496 }
1497
1498 #[cfg(test)]
1500 pub(crate) fn id(&self) -> &TransactionId {
1501 match self {
1502 Self::Pending(added) => added.transaction.id(),
1503 Self::Parked { transaction, .. } => transaction.id(),
1504 }
1505 }
1506
1507 pub const fn queued_reason(&self) -> Option<&QueuedReason> {
1509 match self {
1510 Self::Pending(_) => None,
1511 Self::Parked { queued_reason, .. } => queued_reason.as_ref(),
1512 }
1513 }
1514
1515 pub fn transaction_state(&self) -> AddedTransactionState {
1517 match self.subpool() {
1518 SubPool::Pending => AddedTransactionState::Pending,
1519 _ => {
1520 if let Some(reason) = self.queued_reason() {
1523 AddedTransactionState::Queued(reason.clone())
1524 } else {
1525 AddedTransactionState::Queued(QueuedReason::NonceGap)
1527 }
1528 }
1529 }
1530 }
1531}
1532
1533#[derive(Debug, Clone, PartialEq, Eq)]
1535pub enum QueuedReason {
1536 NonceGap,
1538 ParkedAncestors,
1540 InsufficientBalance,
1542 TooMuchGas,
1544 InsufficientBaseFee,
1546 InsufficientBlobFee,
1548}
1549
1550#[derive(Debug, Clone, PartialEq, Eq)]
1552pub enum AddedTransactionState {
1553 Pending,
1555 Queued(QueuedReason),
1557}
1558
1559impl AddedTransactionState {
1560 pub const fn is_queued(&self) -> bool {
1562 matches!(self, Self::Queued(_))
1563 }
1564
1565 pub const fn is_pending(&self) -> bool {
1567 matches!(self, Self::Pending)
1568 }
1569
1570 pub const fn queued_reason(&self) -> Option<&QueuedReason> {
1572 match self {
1573 Self::Queued(reason) => Some(reason),
1574 Self::Pending => None,
1575 }
1576 }
1577}
1578
1579#[derive(Debug, Clone, PartialEq, Eq)]
1581pub struct AddedTransactionOutcome {
1582 pub hash: TxHash,
1584 pub state: AddedTransactionState,
1586}
1587
1588impl AddedTransactionOutcome {
1589 pub const fn is_queued(&self) -> bool {
1591 self.state.is_queued()
1592 }
1593
1594 pub const fn is_pending(&self) -> bool {
1596 self.state.is_pending()
1597 }
1598}
1599
1600#[derive(Debug)]
1602pub(crate) struct OnNewCanonicalStateOutcome<T: PoolTransaction> {
1603 pub(crate) block_hash: B256,
1605 pub(crate) mined: Vec<TxHash>,
1607 pub(crate) promoted: Vec<Arc<ValidPoolTransaction<T>>>,
1609 pub(crate) discarded: Vec<Arc<ValidPoolTransaction<T>>>,
1611}
1612
1613impl<T: PoolTransaction> OnNewCanonicalStateOutcome<T> {
1614 pub(crate) fn pending_transactions(
1620 &self,
1621 kind: TransactionListenerKind,
1622 ) -> impl Iterator<Item = B256> + '_ {
1623 let iter = self.promoted.iter();
1624 PendingTransactionIter { kind, iter }
1625 }
1626
1627 pub(crate) fn full_pending_transactions(
1633 &self,
1634 kind: TransactionListenerKind,
1635 ) -> impl Iterator<Item = NewTransactionEvent<T>> + '_ {
1636 let iter = self.promoted.iter();
1637 FullPendingTransactionIter { kind, iter }
1638 }
1639}
1640
1641#[cfg(test)]
1642mod tests {
1643 use crate::{
1644 blobstore::{BlobStore, InMemoryBlobStore},
1645 identifier::SenderId,
1646 test_utils::{MockTransaction, TestPoolBuilder},
1647 validate::ValidTransaction,
1648 BlockInfo, PoolConfig, SubPoolLimit, TransactionOrigin, TransactionValidationOutcome, U256,
1649 };
1650 use alloy_eips::{eip4844::BlobTransactionSidecar, eip7594::BlobTransactionSidecarVariant};
1651 use alloy_primitives::Address;
1652 use std::{fs, path::PathBuf};
1653
1654 #[test]
1655 fn test_discard_blobs_on_blob_tx_eviction() {
1656 let blobs = {
1657 let json_content = fs::read_to_string(
1659 PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test_data/blob1.json"),
1660 )
1661 .expect("Failed to read the blob data file");
1662
1663 let json_value: serde_json::Value =
1665 serde_json::from_str(&json_content).expect("Failed to deserialize JSON");
1666
1667 vec![
1669 json_value
1671 .get("data")
1672 .unwrap()
1673 .as_str()
1674 .expect("Data is not a valid string")
1675 .to_string(),
1676 ]
1677 };
1678
1679 let sidecar = BlobTransactionSidecarVariant::Eip4844(
1681 BlobTransactionSidecar::try_from_blobs_hex(blobs).unwrap(),
1682 );
1683
1684 let blob_limit = SubPoolLimit::new(1000, usize::MAX);
1686
1687 let test_pool = &TestPoolBuilder::default()
1689 .with_config(PoolConfig { blob_limit, ..Default::default() })
1690 .pool;
1691
1692 test_pool
1694 .set_block_info(BlockInfo { pending_blob_fee: Some(10_000_000), ..Default::default() });
1695
1696 let blob_store = InMemoryBlobStore::default();
1698
1699 for n in 0..blob_limit.max_txs + 10 {
1701 let mut tx = MockTransaction::eip4844_with_sidecar(sidecar.clone());
1703
1704 tx.set_size(1844674407370951);
1706
1707 if n < blob_limit.max_txs {
1709 blob_store.insert(*tx.get_hash(), sidecar.clone()).unwrap();
1710 }
1711
1712 test_pool.add_transactions(
1714 TransactionOrigin::External,
1715 [TransactionValidationOutcome::Valid {
1716 balance: U256::from(1_000),
1717 state_nonce: 0,
1718 bytecode_hash: None,
1719 transaction: ValidTransaction::ValidWithSidecar {
1720 transaction: tx,
1721 sidecar: sidecar.clone(),
1722 },
1723 propagate: true,
1724 authorities: None,
1725 }],
1726 );
1727 }
1728
1729 assert_eq!(test_pool.size().blob, blob_limit.max_txs);
1731
1732 assert_eq!(test_pool.size().blob_size, 1844674407370951000);
1734
1735 assert_eq!(*test_pool.blob_store(), blob_store);
1737 }
1738
1739 #[test]
1740 fn test_auths_stored_in_identifiers() {
1741 let test_pool = &TestPoolBuilder::default().with_config(Default::default()).pool;
1743
1744 let auth = Address::new([1; 20]);
1745 let tx = MockTransaction::eip7702();
1746
1747 test_pool.add_transactions(
1748 TransactionOrigin::Local,
1749 [TransactionValidationOutcome::Valid {
1750 balance: U256::from(1_000),
1751 state_nonce: 0,
1752 bytecode_hash: None,
1753 transaction: ValidTransaction::Valid(tx),
1754 propagate: true,
1755 authorities: Some(vec![auth]),
1756 }],
1757 );
1758
1759 let identifiers = test_pool.identifiers.read();
1760 assert_eq!(identifiers.sender_id(&auth), Some(SenderId::from(1)));
1761 }
1762
1763 #[test]
1764 fn sender_queries_do_not_allocate_ids_for_unknown_addresses() {
1765 let test_pool = &TestPoolBuilder::default().with_config(Default::default()).pool;
1766 let sender = Address::new([9; 20]);
1767
1768 assert_eq!(test_pool.sender_id(&sender), None);
1769 assert!(test_pool.get_transactions_by_sender(sender).is_empty());
1770 assert!(test_pool.get_pending_transaction_by_sender_and_nonce(sender, 0).is_none());
1771 assert!(test_pool.get_queued_transactions_by_sender(sender).is_empty());
1772 assert!(test_pool.get_pending_transactions_by_sender(sender).is_empty());
1773 assert!(test_pool.get_highest_transaction_by_sender(sender).is_none());
1774 assert!(test_pool.get_highest_consecutive_transaction_by_sender(sender, 0).is_none());
1775 assert!(test_pool.remove_transactions_by_sender(sender).is_empty());
1776 assert_eq!(test_pool.sender_id(&sender), None);
1777 }
1778}