1use crate::{
69 blobstore::BlobStore,
70 error::{PoolError, PoolErrorKind, PoolResult},
71 identifier::{SenderId, SenderIdentifiers, TransactionId},
72 metrics::BlobStoreMetrics,
73 pool::{
74 listener::{
75 BlobTransactionSidecarListener, PendingTransactionHashListener, PoolEventBroadcast,
76 TransactionListener,
77 },
78 state::SubPool,
79 txpool::{SenderInfo, TxPool},
80 update::UpdateOutcome,
81 },
82 traits::{
83 AllPoolTransactions, BestTransactionsAttributes, BlockInfo, GetPooledTransactionLimit,
84 NewBlobSidecar, PoolSize, PoolTransaction, PropagatedTransactions, TransactionOrigin,
85 },
86 validate::{TransactionValidationOutcome, ValidPoolTransaction, ValidTransaction},
87 CanonicalStateUpdate, EthPoolTransaction, PoolConfig, TransactionOrdering,
88 TransactionValidator,
89};
90
91use alloy_primitives::{
92 map::{AddressSet, HashSet},
93 Address, TxHash, B256,
94};
95use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
96use reth_eth_wire_types::HandleMempoolData;
97use reth_execution_types::ChangedAccount;
98
99use alloy_eips::{eip7594::BlobTransactionSidecarVariant, Typed2718};
100use reth_primitives_traits::Recovered;
101use rustc_hash::FxHashMap;
102use std::{
103 fmt,
104 sync::{
105 atomic::{AtomicBool, Ordering},
106 Arc,
107 },
108 time::Instant,
109};
110use tokio::sync::mpsc;
111use tracing::{debug, trace, warn};
112mod events;
113pub use best::{BestTransactionFilter, BestTransactionsWithPrioritizedSenders};
114pub use blob::{blob_tx_priority, fee_delta, BlobOrd, BlobTransactions};
115pub use events::{FullTransactionEvent, NewTransactionEvent, TransactionEvent};
116pub use listener::{AllTransactionsEvents, TransactionEvents, TransactionListenerKind};
117pub use parked::{BasefeeOrd, ParkedOrd, ParkedPool, QueuedOrd};
118pub use pending::PendingPool;
119
120mod best;
121pub use best::BestTransactions;
122
123mod blob;
124pub mod listener;
125mod parked;
126pub mod pending;
127pub mod size;
128pub(crate) mod state;
129pub mod txpool;
130mod update;
131
132pub const PENDING_TX_LISTENER_BUFFER_SIZE: usize = 2048;
134pub const NEW_TX_LISTENER_BUFFER_SIZE: usize = 1024;
136
137const BLOB_SIDECAR_LISTENER_BUFFER_SIZE: usize = 512;
138
139pub struct PoolInner<V, T, S>
141where
142 T: TransactionOrdering,
143{
144 identifiers: RwLock<SenderIdentifiers>,
146 validator: V,
148 blob_store: S,
150 pool: RwLock<TxPool<T>>,
152 config: PoolConfig,
154 event_listener: RwLock<PoolEventBroadcast<T::Transaction>>,
156 has_event_listeners: AtomicBool,
158 pending_transaction_listener: RwLock<Vec<PendingTransactionHashListener>>,
160 transaction_listener: RwLock<Vec<TransactionListener<T::Transaction>>>,
162 blob_transaction_sidecar_listener: Mutex<Vec<BlobTransactionSidecarListener>>,
164 blob_store_metrics: BlobStoreMetrics,
166}
167
168impl<V, T, S> PoolInner<V, T, S>
171where
172 V: TransactionValidator,
173 T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
174 S: BlobStore,
175{
176 pub fn new(validator: V, ordering: T, blob_store: S, config: PoolConfig) -> Self {
178 Self {
179 identifiers: Default::default(),
180 validator,
181 event_listener: Default::default(),
182 has_event_listeners: AtomicBool::new(false),
183 pool: RwLock::new(TxPool::new(ordering, config.clone())),
184 pending_transaction_listener: Default::default(),
185 transaction_listener: Default::default(),
186 blob_transaction_sidecar_listener: Default::default(),
187 config,
188 blob_store,
189 blob_store_metrics: Default::default(),
190 }
191 }
192
193 pub const fn blob_store(&self) -> &S {
195 &self.blob_store
196 }
197
198 pub fn size(&self) -> PoolSize {
200 self.get_pool_data().size()
201 }
202
203 pub fn block_info(&self) -> BlockInfo {
205 self.get_pool_data().block_info()
206 }
207 pub fn set_block_info(&self, info: BlockInfo) {
212 let outcome = self.pool.write().set_block_info(info);
213
214 self.notify_on_transaction_updates(outcome.promoted, outcome.discarded);
216 }
217
218 pub fn get_sender_id(&self, addr: Address) -> SenderId {
225 self.identifiers.write().sender_id_or_create(addr)
226 }
227
228 pub fn sender_id(&self, addr: &Address) -> Option<SenderId> {
233 self.identifiers.read().sender_id(addr)
234 }
235
236 pub fn get_sender_ids(&self, addrs: impl IntoIterator<Item = Address>) -> Vec<SenderId> {
238 self.identifiers.write().sender_ids_or_create(addrs)
239 }
240
241 pub fn unique_senders(&self) -> AddressSet {
243 self.get_pool_data().unique_senders()
244 }
245
246 fn changed_senders(
249 &self,
250 accs: impl Iterator<Item = ChangedAccount>,
251 ) -> FxHashMap<SenderId, SenderInfo> {
252 let identifiers = self.identifiers.read();
253 accs.into_iter()
254 .filter_map(|acc| {
255 let ChangedAccount { address, nonce, balance } = acc;
256 let sender_id = identifiers.sender_id(&address)?;
257 Some((sender_id, SenderInfo { state_nonce: nonce, balance }))
258 })
259 .collect()
260 }
261
262 pub const fn config(&self) -> &PoolConfig {
264 &self.config
265 }
266
267 pub const fn validator(&self) -> &V {
269 &self.validator
270 }
271
272 pub fn add_pending_listener(&self, kind: TransactionListenerKind) -> mpsc::Receiver<TxHash> {
275 let (sender, rx) = mpsc::channel(self.config.pending_tx_listener_buffer_size);
276 let listener = PendingTransactionHashListener { sender, kind };
277
278 let mut listeners = self.pending_transaction_listener.write();
279 listeners.retain(|l| !l.sender.is_closed());
281 listeners.push(listener);
282
283 rx
284 }
285
286 pub fn add_new_transaction_listener(
288 &self,
289 kind: TransactionListenerKind,
290 ) -> mpsc::Receiver<NewTransactionEvent<T::Transaction>> {
291 let (sender, rx) = mpsc::channel(self.config.new_tx_listener_buffer_size);
292 let listener = TransactionListener { sender, kind };
293
294 let mut listeners = self.transaction_listener.write();
295 listeners.retain(|l| !l.sender.is_closed());
297 listeners.push(listener);
298
299 rx
300 }
301 pub fn add_blob_sidecar_listener(&self) -> mpsc::Receiver<NewBlobSidecar> {
304 let (sender, rx) = mpsc::channel(BLOB_SIDECAR_LISTENER_BUFFER_SIZE);
305 let listener = BlobTransactionSidecarListener { sender };
306 self.blob_transaction_sidecar_listener.lock().push(listener);
307 rx
308 }
309
310 pub fn add_transaction_event_listener(&self, tx_hash: TxHash) -> Option<TransactionEvents> {
313 if !self.get_pool_data().contains(&tx_hash) {
314 return None
315 }
316 let mut listener = self.event_listener.write();
317 let events = listener.subscribe(tx_hash);
318 self.mark_event_listener_installed();
319 Some(events)
320 }
321
322 pub fn add_all_transactions_event_listener(&self) -> AllTransactionsEvents<T::Transaction> {
324 let mut listener = self.event_listener.write();
325 let events = listener.subscribe_all();
326 self.mark_event_listener_installed();
327 events
328 }
329
330 #[inline]
331 fn has_event_listeners(&self) -> bool {
332 self.has_event_listeners.load(Ordering::Relaxed)
333 }
334
335 #[inline]
336 fn mark_event_listener_installed(&self) {
337 self.has_event_listeners.store(true, Ordering::Relaxed);
338 }
339
340 #[inline]
341 fn update_event_listener_state(&self, listener: &PoolEventBroadcast<T::Transaction>) {
342 if listener.is_empty() {
343 self.has_event_listeners.store(false, Ordering::Relaxed);
344 }
345 }
346
347 #[inline]
348 fn with_event_listener<F>(&self, emit: F)
349 where
350 F: FnOnce(&mut PoolEventBroadcast<T::Transaction>),
351 {
352 if !self.has_event_listeners() {
353 return
354 }
355 let mut listener = self.event_listener.write();
356 if !listener.is_empty() {
357 emit(&mut listener);
358 }
359 self.update_event_listener_state(&listener);
360 }
361
362 pub fn get_pool_data(&self) -> RwLockReadGuard<'_, TxPool<T>> {
364 self.pool.read()
365 }
366
367 pub fn pooled_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
369 let mut out = Vec::new();
370 self.append_pooled_transactions(&mut out);
371 out
372 }
373
374 pub fn pooled_transactions_hashes(&self) -> Vec<TxHash> {
376 let mut out = Vec::new();
377 self.append_pooled_transactions_hashes(&mut out);
378 out
379 }
380
381 pub fn pooled_transactions_max(
383 &self,
384 max: usize,
385 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
386 let mut out = Vec::new();
387 self.append_pooled_transactions_max(max, &mut out);
388 out
389 }
390
391 pub fn append_pooled_transactions(
393 &self,
394 out: &mut Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
395 ) {
396 out.extend(
397 self.get_pool_data().all().transactions_iter().filter(|tx| tx.propagate).cloned(),
398 );
399 }
400
401 pub fn append_pooled_transaction_elements(
404 &self,
405 tx_hashes: &[TxHash],
406 limit: GetPooledTransactionLimit,
407 out: &mut Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>,
408 ) where
409 <V as TransactionValidator>::Transaction: EthPoolTransaction,
410 {
411 let transactions = self.get_all_propagatable(tx_hashes);
412 let mut size = 0;
413 for transaction in transactions {
414 let encoded_len = transaction.encoded_length();
415 let Some(pooled) = self.to_pooled_transaction(transaction) else {
416 continue;
417 };
418
419 size += encoded_len;
420 out.push(pooled.into_inner());
421
422 if limit.exceeds(size) {
423 break
424 }
425 }
426 }
427
428 pub fn append_pooled_transactions_hashes(&self, out: &mut Vec<TxHash>) {
431 out.extend(
432 self.get_pool_data()
433 .all()
434 .transactions_iter()
435 .filter(|tx| tx.propagate)
436 .map(|tx| *tx.hash()),
437 );
438 }
439
440 pub fn append_pooled_transactions_max(
443 &self,
444 max: usize,
445 out: &mut Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
446 ) {
447 out.extend(
448 self.get_pool_data()
449 .all()
450 .transactions_iter()
451 .filter(|tx| tx.propagate)
452 .take(max)
453 .cloned(),
454 );
455 }
456
457 pub fn pooled_transactions_hashes_max(&self, max: usize) -> Vec<TxHash> {
459 if max == 0 {
460 return Vec::new();
461 }
462 self.get_pool_data()
463 .all()
464 .transactions_iter()
465 .filter(|tx| tx.propagate)
466 .take(max)
467 .map(|tx| *tx.hash())
468 .collect()
469 }
470
471 fn to_pooled_transaction(
476 &self,
477 transaction: Arc<ValidPoolTransaction<T::Transaction>>,
478 ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
479 where
480 <V as TransactionValidator>::Transaction: EthPoolTransaction,
481 {
482 if transaction.is_eip4844() {
483 let sidecar = self.blob_store.get(*transaction.hash()).ok()??;
484 transaction.transaction.clone().try_into_pooled_eip4844(sidecar)
485 } else {
486 transaction
487 .transaction
488 .clone_into_pooled()
489 .inspect_err(|err| {
490 debug!(
491 target: "txpool", %err,
492 "failed to convert transaction to pooled element; skipping",
493 );
494 })
495 .ok()
496 }
497 }
498
499 pub fn get_pooled_transaction_elements(
502 &self,
503 tx_hashes: Vec<TxHash>,
504 limit: GetPooledTransactionLimit,
505 ) -> Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>
506 where
507 <V as TransactionValidator>::Transaction: EthPoolTransaction,
508 {
509 let mut elements = Vec::new();
510 self.append_pooled_transaction_elements(&tx_hashes, limit, &mut elements);
511 elements.shrink_to_fit();
512 elements
513 }
514
515 pub fn get_pooled_transaction_element(
517 &self,
518 tx_hash: TxHash,
519 ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
520 where
521 <V as TransactionValidator>::Transaction: EthPoolTransaction,
522 {
523 self.get(&tx_hash).and_then(|tx| self.to_pooled_transaction(tx))
524 }
525
526 pub fn on_canonical_state_change(&self, update: CanonicalStateUpdate<'_, V::Block>) {
528 trace!(target: "txpool", ?update, "updating pool on canonical state change");
529
530 let block_info = update.block_info();
531 let CanonicalStateUpdate {
532 new_tip, changed_accounts, mined_transactions, update_kind, ..
533 } = update;
534 self.validator.on_new_head_block(new_tip);
535
536 let changed_senders = self.changed_senders(changed_accounts.into_iter());
537
538 let outcome = self.pool.write().on_canonical_state_change(
540 block_info,
541 mined_transactions,
542 changed_senders,
543 update_kind,
544 );
545
546 self.delete_discarded_blobs(outcome.discarded.iter());
548
549 self.notify_on_new_state(outcome);
551 }
552
553 pub fn update_accounts(&self, accounts: Vec<ChangedAccount>) {
559 let changed_senders = self.changed_senders(accounts.into_iter());
560 let UpdateOutcome { promoted, discarded } =
561 self.pool.write().update_accounts(changed_senders);
562
563 self.notify_on_transaction_updates(promoted, discarded);
564 }
565
566 fn add_transaction(
574 &self,
575 pool: &mut RwLockWriteGuard<'_, TxPool<T>>,
576 origin: TransactionOrigin,
577 tx: TransactionValidationOutcome<T::Transaction>,
578 ) -> (PoolResult<AddedTransactionOutcome>, Option<AddedTransactionMeta<T::Transaction>>) {
579 match tx {
580 TransactionValidationOutcome::Valid {
581 balance,
582 state_nonce,
583 transaction,
584 propagate,
585 bytecode_hash,
586 authorities,
587 } => {
588 let sender_id = self.get_sender_id(transaction.sender());
589 let transaction_id = TransactionId::new(sender_id, transaction.nonce());
590
591 let (transaction, blob_sidecar) = match transaction {
593 ValidTransaction::Valid(tx) => (tx, None),
594 ValidTransaction::ValidWithSidecar { transaction, sidecar } => {
595 debug_assert!(
596 transaction.is_eip4844(),
597 "validator returned sidecar for non EIP-4844 transaction"
598 );
599 (transaction, Some(sidecar))
600 }
601 };
602
603 let tx = ValidPoolTransaction {
604 transaction,
605 transaction_id,
606 propagate,
607 timestamp: Instant::now(),
608 origin,
609 authority_ids: authorities.map(|auths| self.get_sender_ids(auths)),
610 };
611
612 let added = match pool.add_transaction(tx, balance, state_nonce, bytecode_hash) {
613 Ok(added) => added,
614 Err(err) => return (Err(err), None),
615 };
616 let hash = *added.hash();
617 let state = added.transaction_state();
618
619 let meta = AddedTransactionMeta { added, blob_sidecar };
620
621 (Ok(AddedTransactionOutcome { hash, state }), Some(meta))
622 }
623 TransactionValidationOutcome::Invalid(tx, err) => {
624 self.with_event_listener(|listener| listener.invalid(tx.hash()));
625 (Err(PoolError::new(*tx.hash(), err)), None)
626 }
627 TransactionValidationOutcome::Error(tx_hash, err) => {
628 self.with_event_listener(|listener| listener.discarded(&tx_hash));
629 (Err(PoolError::other(tx_hash, err)), None)
630 }
631 }
632 }
633
634 pub fn add_transaction_and_subscribe(
636 &self,
637 origin: TransactionOrigin,
638 tx: TransactionValidationOutcome<T::Transaction>,
639 ) -> PoolResult<TransactionEvents> {
640 let listener = {
641 let mut listener = self.event_listener.write();
642 let events = listener.subscribe(tx.tx_hash());
643 self.mark_event_listener_installed();
644 events
645 };
646 let mut results = self.add_transactions(origin, std::iter::once(tx));
647 results.pop().expect("result length is the same as the input")?;
648 Ok(listener)
649 }
650
651 pub fn add_transactions(
656 &self,
657 origin: TransactionOrigin,
658 transactions: impl IntoIterator<Item = TransactionValidationOutcome<T::Transaction>>,
659 ) -> Vec<PoolResult<AddedTransactionOutcome>> {
660 self.add_transactions_with_origins(transactions.into_iter().map(|tx| (origin, tx)))
661 }
662
663 pub fn add_transactions_with_origins(
666 &self,
667 transactions: impl IntoIterator<
668 Item = (TransactionOrigin, TransactionValidationOutcome<T::Transaction>),
669 >,
670 ) -> Vec<PoolResult<AddedTransactionOutcome>> {
671 let (mut results, added_metas, discarded) = {
673 let mut pool = self.pool.write();
674 let mut added_metas = Vec::new();
675
676 let results = transactions
677 .into_iter()
678 .map(|(origin, tx)| {
679 let (result, meta) = self.add_transaction(&mut pool, origin, tx);
680
681 if result.is_ok() &&
683 let Some(meta) = meta
684 {
685 added_metas.push(meta);
686 }
687
688 result
689 })
690 .collect::<Vec<_>>();
691
692 let discarded = if results.iter().any(Result::is_ok) {
694 pool.discard_worst()
695 } else {
696 Default::default()
697 };
698
699 (results, added_metas, discarded)
700 };
701
702 for meta in added_metas {
703 self.on_added_transaction(meta);
704 }
705
706 if !discarded.is_empty() {
707 self.delete_discarded_blobs(discarded.iter());
709 self.with_event_listener(|listener| listener.discarded_many(&discarded));
710
711 let discarded_hashes =
712 discarded.into_iter().map(|tx| *tx.hash()).collect::<HashSet<_>>();
713
714 for res in &mut results {
717 if let Ok(AddedTransactionOutcome { hash, .. }) = res &&
718 discarded_hashes.contains(hash)
719 {
720 *res = Err(PoolError::new(*hash, PoolErrorKind::DiscardedOnInsert))
721 }
722 }
723 };
724
725 results
726 }
727
728 fn on_added_transaction(&self, meta: AddedTransactionMeta<T::Transaction>) {
733 if let Some(sidecar) = meta.blob_sidecar {
735 let hash = *meta.added.hash();
736 self.on_new_blob_sidecar(&hash, &sidecar);
737 self.insert_blob(hash, sidecar);
738 }
739
740 if let Some(replaced) = meta.added.replaced_blob_transaction() {
742 debug!(target: "txpool", "[{:?}] delete replaced blob sidecar", replaced);
743 self.delete_blob(replaced);
744 }
745
746 if let Some(discarded) = meta.added.discarded_transactions() {
748 self.delete_discarded_blobs(discarded.iter());
749 }
750
751 if let Some(pending) = meta.added.as_pending() {
753 self.on_new_pending_transaction(pending);
754 }
755
756 self.notify_event_listeners(&meta.added);
758
759 self.on_new_transaction(meta.added.into_new_transaction_event());
761 }
762
763 pub fn on_new_pending_transaction(&self, pending: &AddedPendingTransaction<T::Transaction>) {
772 let mut needs_cleanup = false;
773
774 {
775 let listeners = self.pending_transaction_listener.read();
776 for listener in listeners.iter() {
777 if !listener.send_all(pending.pending_transactions(listener.kind)) {
778 needs_cleanup = true;
779 }
780 }
781 }
782
783 if needs_cleanup {
785 self.pending_transaction_listener
786 .write()
787 .retain(|listener| !listener.sender.is_closed());
788 }
789 }
790
791 pub fn on_new_transaction(&self, event: NewTransactionEvent<T::Transaction>) {
800 let mut needs_cleanup = false;
801
802 {
803 let listeners = self.transaction_listener.read();
804 for listener in listeners.iter() {
805 if listener.kind.is_propagate_only() && !event.transaction.propagate {
806 if listener.sender.is_closed() {
807 needs_cleanup = true;
808 }
809 continue
811 }
812
813 if !listener.send(event.clone()) {
814 needs_cleanup = true;
815 }
816 }
817 }
818
819 if needs_cleanup {
821 self.transaction_listener.write().retain(|listener| !listener.sender.is_closed());
822 }
823 }
824
825 fn on_new_blob_sidecar(&self, tx_hash: &TxHash, sidecar: &BlobTransactionSidecarVariant) {
827 let mut sidecar_listeners = self.blob_transaction_sidecar_listener.lock();
828 if sidecar_listeners.is_empty() {
829 return
830 }
831 let sidecar = Arc::new(sidecar.clone());
832 sidecar_listeners.retain_mut(|listener| {
833 let new_blob_event = NewBlobSidecar { tx_hash: *tx_hash, sidecar: sidecar.clone() };
834 match listener.sender.try_send(new_blob_event) {
835 Ok(()) => true,
836 Err(err) => {
837 if matches!(err, mpsc::error::TrySendError::Full(_)) {
838 debug!(
839 target: "txpool",
840 "[{:?}] failed to send blob sidecar; channel full",
841 sidecar,
842 );
843 true
844 } else {
845 false
846 }
847 }
848 }
849 })
850 }
851
852 fn notify_on_new_state(&self, outcome: OnNewCanonicalStateOutcome<T::Transaction>) {
854 trace!(target: "txpool", promoted=outcome.promoted.len(), discarded= outcome.discarded.len() ,"notifying listeners on state change");
855
856 let mut needs_pending_cleanup = false;
858 {
859 let listeners = self.pending_transaction_listener.read();
860 for listener in listeners.iter() {
861 if !listener.send_all(outcome.pending_transactions(listener.kind)) {
862 needs_pending_cleanup = true;
863 }
864 }
865 }
866 if needs_pending_cleanup {
867 self.pending_transaction_listener.write().retain(|l| !l.sender.is_closed());
868 }
869
870 let mut needs_tx_cleanup = false;
872 {
873 let listeners = self.transaction_listener.read();
874 for listener in listeners.iter() {
875 if !listener.send_all(outcome.full_pending_transactions(listener.kind)) {
876 needs_tx_cleanup = true;
877 }
878 }
879 }
880 if needs_tx_cleanup {
881 self.transaction_listener.write().retain(|l| !l.sender.is_closed());
882 }
883
884 let OnNewCanonicalStateOutcome { mined, promoted, discarded, block_hash } = outcome;
885
886 self.with_event_listener(|listener| {
888 for tx in &mined {
889 listener.mined(tx, block_hash);
890 }
891 for tx in &promoted {
892 listener.pending(tx.hash(), None);
893 }
894 for tx in &discarded {
895 listener.discarded(tx.hash());
896 }
897 })
898 }
899
900 #[allow(clippy::type_complexity)]
909 pub fn notify_on_transaction_updates(
910 &self,
911 promoted: Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
912 discarded: Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
913 ) {
914 if !promoted.is_empty() {
916 let mut needs_pending_cleanup = false;
917 {
918 let listeners = self.pending_transaction_listener.read();
919 for listener in listeners.iter() {
920 let promoted_hashes = promoted.iter().filter_map(|tx| {
921 if listener.kind.is_propagate_only() && !tx.propagate {
922 None
923 } else {
924 Some(*tx.hash())
925 }
926 });
927 if !listener.send_all(promoted_hashes) {
928 needs_pending_cleanup = true;
929 }
930 }
931 }
932 if needs_pending_cleanup {
933 self.pending_transaction_listener.write().retain(|l| !l.sender.is_closed());
934 }
935
936 let mut needs_tx_cleanup = false;
938 {
939 let listeners = self.transaction_listener.read();
940 for listener in listeners.iter() {
941 let promoted_txs = promoted.iter().filter_map(|tx| {
942 if listener.kind.is_propagate_only() && !tx.propagate {
943 None
944 } else {
945 Some(NewTransactionEvent::pending(tx.clone()))
946 }
947 });
948 if !listener.send_all(promoted_txs) {
949 needs_tx_cleanup = true;
950 }
951 }
952 }
953 if needs_tx_cleanup {
954 self.transaction_listener.write().retain(|l| !l.sender.is_closed());
955 }
956 }
957
958 self.with_event_listener(|listener| {
959 for tx in &promoted {
960 listener.pending(tx.hash(), None);
961 }
962 for tx in &discarded {
963 listener.discarded(tx.hash());
964 }
965 });
966
967 if !discarded.is_empty() {
968 self.delete_discarded_blobs(discarded.iter());
971 }
972 }
973
974 pub fn notify_event_listeners(&self, tx: &AddedTransaction<T::Transaction>) {
983 self.with_event_listener(|listener| match tx {
984 AddedTransaction::Pending(tx) => {
985 let AddedPendingTransaction { transaction, promoted, discarded, replaced } = tx;
986
987 listener.pending(transaction.hash(), replaced.clone());
988 for tx in promoted {
989 listener.pending(tx.hash(), None);
990 }
991 for tx in discarded {
992 listener.discarded(tx.hash());
993 }
994 }
995 AddedTransaction::Parked { transaction, replaced, queued_reason, .. } => {
996 listener.queued(transaction.hash(), queued_reason.clone());
997 if let Some(replaced) = replaced {
998 listener.replaced(replaced.clone(), *transaction.hash());
999 }
1000 }
1001 });
1002 }
1003
1004 pub fn best_transactions(&self) -> BestTransactions<T> {
1006 self.get_pool_data().best_transactions()
1007 }
1008
1009 pub fn best_transactions_with_attributes(
1012 &self,
1013 best_transactions_attributes: BestTransactionsAttributes,
1014 ) -> Box<dyn crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T::Transaction>>>>
1015 {
1016 self.get_pool_data().best_transactions_with_attributes(best_transactions_attributes)
1017 }
1018
1019 pub fn pending_transactions_max(
1021 &self,
1022 max: usize,
1023 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1024 self.get_pool_data().pending_transactions_iter().take(max).collect()
1025 }
1026
1027 pub fn pending_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1029 self.get_pool_data().pending_transactions()
1030 }
1031
1032 pub fn queued_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1034 self.get_pool_data().queued_transactions()
1035 }
1036
1037 pub fn all_transactions(&self) -> AllPoolTransactions<T::Transaction> {
1039 let pool = self.get_pool_data();
1040 AllPoolTransactions {
1041 pending: pool.pending_transactions(),
1042 queued: pool.queued_transactions(),
1043 }
1044 }
1045
1046 pub fn all_transaction_hashes(&self) -> Vec<TxHash> {
1048 self.get_pool_data().all().transactions_iter().map(|tx| *tx.hash()).collect()
1049 }
1050
1051 pub fn remove_transactions(
1056 &self,
1057 hashes: Vec<TxHash>,
1058 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1059 if hashes.is_empty() {
1060 return Vec::new()
1061 }
1062 let removed = self.pool.write().remove_transactions(hashes);
1063
1064 self.with_event_listener(|listener| listener.discarded_many(&removed));
1065
1066 removed
1067 }
1068
1069 pub fn remove_transactions_and_descendants(
1072 &self,
1073 hashes: Vec<TxHash>,
1074 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1075 if hashes.is_empty() {
1076 return Vec::new()
1077 }
1078 let removed = self.pool.write().remove_transactions_and_descendants(hashes);
1079
1080 self.with_event_listener(|listener| {
1081 for tx in &removed {
1082 listener.discarded(tx.hash());
1083 }
1084 });
1085
1086 removed
1087 }
1088
1089 pub fn remove_transactions_by_sender(
1091 &self,
1092 sender: Address,
1093 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1094 let Some(sender_id) = self.sender_id(&sender) else { return Vec::new() };
1095 let removed = self.pool.write().remove_transactions_by_sender(sender_id);
1096
1097 self.with_event_listener(|listener| listener.discarded_many(&removed));
1098
1099 removed
1100 }
1101
1102 pub fn prune_transactions(
1107 &self,
1108 hashes: Vec<TxHash>,
1109 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1110 if hashes.is_empty() {
1111 return Vec::new()
1112 }
1113
1114 self.pool.write().prune_transactions(hashes)
1115 }
1116
1117 pub fn retain_unknown<A>(&self, announcement: &mut A)
1119 where
1120 A: HandleMempoolData,
1121 {
1122 if announcement.is_empty() {
1123 return
1124 }
1125 let pool = self.get_pool_data();
1126 announcement.retain_by_hash(|tx| !pool.contains(tx))
1127 }
1128
1129 pub fn get(&self, tx_hash: &TxHash) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1131 self.get_pool_data().get(tx_hash)
1132 }
1133
1134 pub fn get_transactions_by_sender(
1136 &self,
1137 sender: Address,
1138 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1139 let Some(sender_id) = self.sender_id(&sender) else { return Vec::new() };
1140 self.get_pool_data().get_transactions_by_sender(sender_id)
1141 }
1142
1143 pub fn get_pending_transaction_by_sender_and_nonce(
1145 &self,
1146 sender: Address,
1147 nonce: u64,
1148 ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1149 let sender_id = self.sender_id(&sender)?;
1150 self.get_pool_data().get_pending_transaction_by_sender_and_nonce(sender_id, nonce)
1151 }
1152
1153 pub fn get_queued_transactions_by_sender(
1155 &self,
1156 sender: Address,
1157 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1158 let Some(sender_id) = self.sender_id(&sender) else { return Vec::new() };
1159 self.get_pool_data().queued_txs_by_sender(sender_id)
1160 }
1161
1162 pub fn pending_transactions_with_predicate(
1164 &self,
1165 predicate: impl FnMut(&ValidPoolTransaction<T::Transaction>) -> bool,
1166 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1167 self.get_pool_data().pending_transactions_with_predicate(predicate)
1168 }
1169
1170 pub fn get_pending_transactions_by_sender(
1172 &self,
1173 sender: Address,
1174 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1175 let Some(sender_id) = self.sender_id(&sender) else { return Vec::new() };
1176 self.get_pool_data().pending_txs_by_sender(sender_id)
1177 }
1178
1179 pub fn get_highest_transaction_by_sender(
1181 &self,
1182 sender: Address,
1183 ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1184 let sender_id = self.sender_id(&sender)?;
1185 self.get_pool_data().get_highest_transaction_by_sender(sender_id)
1186 }
1187
1188 pub fn get_highest_consecutive_transaction_by_sender(
1190 &self,
1191 sender: Address,
1192 on_chain_nonce: u64,
1193 ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1194 let sender_id = self.sender_id(&sender)?;
1195 self.get_pool_data().get_highest_consecutive_transaction_by_sender(
1196 sender_id.into_transaction_id(on_chain_nonce),
1197 )
1198 }
1199
1200 pub fn get_transaction_by_transaction_id(
1202 &self,
1203 transaction_id: &TransactionId,
1204 ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1205 self.get_pool_data().all().get(transaction_id).map(|tx| tx.transaction.clone())
1206 }
1207
1208 pub fn get_transactions_by_origin(
1210 &self,
1211 origin: TransactionOrigin,
1212 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1213 self.get_pool_data()
1214 .all()
1215 .transactions_iter()
1216 .filter(|tx| tx.origin == origin)
1217 .cloned()
1218 .collect()
1219 }
1220
1221 pub fn get_pending_transactions_by_origin(
1223 &self,
1224 origin: TransactionOrigin,
1225 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1226 self.get_pool_data().pending_transactions_iter().filter(|tx| tx.origin == origin).collect()
1227 }
1228
1229 pub fn get_all(&self, txs: Vec<TxHash>) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1233 if txs.is_empty() {
1234 return Vec::new()
1235 }
1236 self.get_pool_data().get_all(txs).collect()
1237 }
1238
1239 fn get_all_propagatable(
1243 &self,
1244 txs: &[TxHash],
1245 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1246 if txs.is_empty() {
1247 return Vec::new()
1248 }
1249 let pool = self.get_pool_data();
1250 txs.iter().filter_map(|tx| pool.get(tx).filter(|tx| tx.propagate)).collect()
1251 }
1252
1253 pub fn on_propagated(&self, txs: PropagatedTransactions) {
1255 if txs.is_empty() {
1256 return
1257 }
1258 self.with_event_listener(|listener| {
1259 txs.into_iter().for_each(|(hash, peers)| listener.propagated(&hash, peers));
1260 });
1261 }
1262
1263 pub fn len(&self) -> usize {
1265 self.get_pool_data().len()
1266 }
1267
1268 pub fn is_empty(&self) -> bool {
1270 self.get_pool_data().is_empty()
1271 }
1272
1273 pub fn is_exceeded(&self) -> bool {
1275 self.pool.read().is_exceeded()
1276 }
1277
1278 fn insert_blob(&self, hash: TxHash, blob: BlobTransactionSidecarVariant) {
1280 debug!(target: "txpool", "[{:?}] storing blob sidecar", hash);
1281 if let Err(err) = self.blob_store.insert(hash, blob) {
1282 warn!(target: "txpool", %err, "[{:?}] failed to insert blob", hash);
1283 self.blob_store_metrics.blobstore_failed_inserts.increment(1);
1284 }
1285 self.update_blob_store_metrics();
1286 }
1287
1288 pub fn delete_blob(&self, blob: TxHash) {
1290 let _ = self.blob_store.delete(blob);
1291 }
1292
1293 pub fn delete_blobs(&self, txs: Vec<TxHash>) {
1295 let _ = self.blob_store.delete_all(txs);
1296 }
1297
1298 pub fn cleanup_blobs(&self) {
1300 let stat = self.blob_store.cleanup();
1301 self.blob_store_metrics.blobstore_failed_deletes.increment(stat.delete_failed as u64);
1302 self.update_blob_store_metrics();
1303 }
1304
1305 fn update_blob_store_metrics(&self) {
1306 if let Some(data_size) = self.blob_store.data_size_hint() {
1307 self.blob_store_metrics.blobstore_byte_size.set(data_size as f64);
1308 }
1309 self.blob_store_metrics.blobstore_entries.set(self.blob_store.blobs_len() as f64);
1310 }
1311
1312 fn delete_discarded_blobs<'a>(
1314 &'a self,
1315 transactions: impl IntoIterator<Item = &'a Arc<ValidPoolTransaction<T::Transaction>>>,
1316 ) {
1317 let blob_txs = transactions
1318 .into_iter()
1319 .filter(|tx| tx.transaction.is_eip4844())
1320 .map(|tx| *tx.hash())
1321 .collect();
1322 self.delete_blobs(blob_txs);
1323 }
1324}
1325
1326impl<V, T: TransactionOrdering, S> fmt::Debug for PoolInner<V, T, S> {
1327 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1328 f.debug_struct("PoolInner").field("config", &self.config).finish_non_exhaustive()
1329 }
1330}
1331
1332#[derive(Debug)]
1337struct AddedTransactionMeta<T: PoolTransaction> {
1338 added: AddedTransaction<T>,
1340 blob_sidecar: Option<BlobTransactionSidecarVariant>,
1342}
1343
1344#[derive(Debug, Clone)]
1346pub struct AddedPendingTransaction<T: PoolTransaction> {
1347 pub transaction: Arc<ValidPoolTransaction<T>>,
1349 pub replaced: Option<Arc<ValidPoolTransaction<T>>>,
1351 pub promoted: Vec<Arc<ValidPoolTransaction<T>>>,
1353 pub discarded: Vec<Arc<ValidPoolTransaction<T>>>,
1355}
1356
1357impl<T: PoolTransaction> AddedPendingTransaction<T> {
1358 pub(crate) fn pending_transactions(
1364 &self,
1365 kind: TransactionListenerKind,
1366 ) -> impl Iterator<Item = B256> + '_ {
1367 let iter = std::iter::once(&self.transaction).chain(self.promoted.iter());
1368 PendingTransactionIter { kind, iter }
1369 }
1370}
1371
1372pub(crate) struct PendingTransactionIter<Iter> {
1373 kind: TransactionListenerKind,
1374 iter: Iter,
1375}
1376
1377impl<'a, Iter, T> Iterator for PendingTransactionIter<Iter>
1378where
1379 Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1380 T: PoolTransaction + 'a,
1381{
1382 type Item = B256;
1383
1384 fn next(&mut self) -> Option<Self::Item> {
1385 loop {
1386 let next = self.iter.next()?;
1387 if self.kind.is_propagate_only() && !next.propagate {
1388 continue
1389 }
1390 return Some(*next.hash())
1391 }
1392 }
1393}
1394
1395pub(crate) struct FullPendingTransactionIter<Iter> {
1397 kind: TransactionListenerKind,
1398 iter: Iter,
1399}
1400
1401impl<'a, Iter, T> Iterator for FullPendingTransactionIter<Iter>
1402where
1403 Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1404 T: PoolTransaction + 'a,
1405{
1406 type Item = NewTransactionEvent<T>;
1407
1408 fn next(&mut self) -> Option<Self::Item> {
1409 loop {
1410 let next = self.iter.next()?;
1411 if self.kind.is_propagate_only() && !next.propagate {
1412 continue
1413 }
1414 return Some(NewTransactionEvent {
1415 subpool: SubPool::Pending,
1416 transaction: next.clone(),
1417 })
1418 }
1419 }
1420}
1421
1422#[derive(Debug, Clone)]
1424pub enum AddedTransaction<T: PoolTransaction> {
1425 Pending(AddedPendingTransaction<T>),
1427 Parked {
1430 transaction: Arc<ValidPoolTransaction<T>>,
1432 replaced: Option<Arc<ValidPoolTransaction<T>>>,
1434 subpool: SubPool,
1436 queued_reason: Option<QueuedReason>,
1438 },
1439}
1440
1441impl<T: PoolTransaction> AddedTransaction<T> {
1442 pub const fn as_pending(&self) -> Option<&AddedPendingTransaction<T>> {
1444 match self {
1445 Self::Pending(tx) => Some(tx),
1446 _ => None,
1447 }
1448 }
1449
1450 pub const fn replaced(&self) -> Option<&Arc<ValidPoolTransaction<T>>> {
1452 match self {
1453 Self::Pending(tx) => tx.replaced.as_ref(),
1454 Self::Parked { replaced, .. } => replaced.as_ref(),
1455 }
1456 }
1457
1458 pub(crate) fn discarded_transactions(&self) -> Option<&[Arc<ValidPoolTransaction<T>>]> {
1460 match self {
1461 Self::Pending(tx) => Some(&tx.discarded),
1462 Self::Parked { .. } => None,
1463 }
1464 }
1465
1466 pub(crate) fn replaced_blob_transaction(&self) -> Option<B256> {
1468 self.replaced().filter(|tx| tx.transaction.is_eip4844()).map(|tx| *tx.transaction.hash())
1469 }
1470
1471 pub fn hash(&self) -> &TxHash {
1473 match self {
1474 Self::Pending(tx) => tx.transaction.hash(),
1475 Self::Parked { transaction, .. } => transaction.hash(),
1476 }
1477 }
1478
1479 pub fn into_new_transaction_event(self) -> NewTransactionEvent<T> {
1481 match self {
1482 Self::Pending(tx) => {
1483 NewTransactionEvent { subpool: SubPool::Pending, transaction: tx.transaction }
1484 }
1485 Self::Parked { transaction, subpool, .. } => {
1486 NewTransactionEvent { transaction, subpool }
1487 }
1488 }
1489 }
1490
1491 pub(crate) const fn subpool(&self) -> SubPool {
1493 match self {
1494 Self::Pending(_) => SubPool::Pending,
1495 Self::Parked { subpool, .. } => *subpool,
1496 }
1497 }
1498
1499 #[cfg(test)]
1501 pub(crate) fn id(&self) -> &TransactionId {
1502 match self {
1503 Self::Pending(added) => added.transaction.id(),
1504 Self::Parked { transaction, .. } => transaction.id(),
1505 }
1506 }
1507
1508 pub const fn queued_reason(&self) -> Option<&QueuedReason> {
1510 match self {
1511 Self::Pending(_) => None,
1512 Self::Parked { queued_reason, .. } => queued_reason.as_ref(),
1513 }
1514 }
1515
1516 pub fn transaction_state(&self) -> AddedTransactionState {
1518 match self.subpool() {
1519 SubPool::Pending => AddedTransactionState::Pending,
1520 _ => {
1521 if let Some(reason) = self.queued_reason() {
1524 AddedTransactionState::Queued(reason.clone())
1525 } else {
1526 AddedTransactionState::Queued(QueuedReason::NonceGap)
1528 }
1529 }
1530 }
1531 }
1532}
1533
1534#[derive(Debug, Clone, PartialEq, Eq)]
1536pub enum QueuedReason {
1537 NonceGap,
1539 ParkedAncestors,
1541 InsufficientBalance,
1543 TooMuchGas,
1545 InsufficientBaseFee,
1547 InsufficientBlobFee,
1549}
1550
1551#[derive(Debug, Clone, PartialEq, Eq)]
1553pub enum AddedTransactionState {
1554 Pending,
1556 Queued(QueuedReason),
1558}
1559
1560impl AddedTransactionState {
1561 pub const fn is_queued(&self) -> bool {
1563 matches!(self, Self::Queued(_))
1564 }
1565
1566 pub const fn is_pending(&self) -> bool {
1568 matches!(self, Self::Pending)
1569 }
1570
1571 pub const fn queued_reason(&self) -> Option<&QueuedReason> {
1573 match self {
1574 Self::Queued(reason) => Some(reason),
1575 Self::Pending => None,
1576 }
1577 }
1578}
1579
1580#[derive(Debug, Clone, PartialEq, Eq)]
1582pub struct AddedTransactionOutcome {
1583 pub hash: TxHash,
1585 pub state: AddedTransactionState,
1587}
1588
1589impl AddedTransactionOutcome {
1590 pub const fn is_queued(&self) -> bool {
1592 self.state.is_queued()
1593 }
1594
1595 pub const fn is_pending(&self) -> bool {
1597 self.state.is_pending()
1598 }
1599}
1600
1601#[derive(Debug)]
1603pub(crate) struct OnNewCanonicalStateOutcome<T: PoolTransaction> {
1604 pub(crate) block_hash: B256,
1606 pub(crate) mined: Vec<TxHash>,
1608 pub(crate) promoted: Vec<Arc<ValidPoolTransaction<T>>>,
1610 pub(crate) discarded: Vec<Arc<ValidPoolTransaction<T>>>,
1612}
1613
1614impl<T: PoolTransaction> OnNewCanonicalStateOutcome<T> {
1615 pub(crate) fn pending_transactions(
1621 &self,
1622 kind: TransactionListenerKind,
1623 ) -> impl Iterator<Item = B256> + '_ {
1624 let iter = self.promoted.iter();
1625 PendingTransactionIter { kind, iter }
1626 }
1627
1628 pub(crate) fn full_pending_transactions(
1634 &self,
1635 kind: TransactionListenerKind,
1636 ) -> impl Iterator<Item = NewTransactionEvent<T>> + '_ {
1637 let iter = self.promoted.iter();
1638 FullPendingTransactionIter { kind, iter }
1639 }
1640}
1641
1642#[cfg(test)]
1643mod tests {
1644 use crate::{
1645 blobstore::{BlobStore, InMemoryBlobStore},
1646 identifier::SenderId,
1647 test_utils::{MockTransaction, TestPoolBuilder},
1648 validate::ValidTransaction,
1649 BlockInfo, PoolConfig, SubPoolLimit, TransactionOrigin, TransactionValidationOutcome, U256,
1650 };
1651 use alloy_eips::{eip4844::BlobTransactionSidecar, eip7594::BlobTransactionSidecarVariant};
1652 use alloy_primitives::Address;
1653 use std::{fs, path::PathBuf};
1654
1655 #[test]
1656 fn test_discard_blobs_on_blob_tx_eviction() {
1657 let blobs = {
1658 let json_content = fs::read_to_string(
1660 PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test_data/blob1.json"),
1661 )
1662 .expect("Failed to read the blob data file");
1663
1664 let json_value: serde_json::Value =
1666 serde_json::from_str(&json_content).expect("Failed to deserialize JSON");
1667
1668 vec![
1670 json_value
1672 .get("data")
1673 .unwrap()
1674 .as_str()
1675 .expect("Data is not a valid string")
1676 .to_string(),
1677 ]
1678 };
1679
1680 let sidecar = BlobTransactionSidecarVariant::Eip4844(
1682 BlobTransactionSidecar::try_from_blobs_hex(blobs).unwrap(),
1683 );
1684
1685 let blob_limit = SubPoolLimit::new(1000, usize::MAX);
1687
1688 let test_pool = &TestPoolBuilder::default()
1690 .with_config(PoolConfig { blob_limit, ..Default::default() })
1691 .pool;
1692
1693 test_pool
1695 .set_block_info(BlockInfo { pending_blob_fee: Some(10_000_000), ..Default::default() });
1696
1697 let blob_store = InMemoryBlobStore::default();
1699
1700 for n in 0..blob_limit.max_txs + 10 {
1702 let mut tx = MockTransaction::eip4844_with_sidecar(sidecar.clone());
1704
1705 tx.set_size(1844674407370951);
1707
1708 if n < blob_limit.max_txs {
1710 blob_store.insert(*tx.get_hash(), sidecar.clone()).unwrap();
1711 }
1712
1713 test_pool.add_transactions(
1715 TransactionOrigin::External,
1716 [TransactionValidationOutcome::Valid {
1717 balance: U256::from(1_000),
1718 state_nonce: 0,
1719 bytecode_hash: None,
1720 transaction: ValidTransaction::ValidWithSidecar {
1721 transaction: tx,
1722 sidecar: sidecar.clone(),
1723 },
1724 propagate: true,
1725 authorities: None,
1726 }],
1727 );
1728 }
1729
1730 assert_eq!(test_pool.size().blob, blob_limit.max_txs);
1732
1733 assert_eq!(test_pool.size().blob_size, 1844674407370951000);
1735
1736 assert_eq!(*test_pool.blob_store(), blob_store);
1738 }
1739
1740 #[test]
1741 fn test_auths_stored_in_identifiers() {
1742 let test_pool = &TestPoolBuilder::default().with_config(Default::default()).pool;
1744
1745 let auth = Address::new([1; 20]);
1746 let tx = MockTransaction::eip7702();
1747
1748 test_pool.add_transactions(
1749 TransactionOrigin::Local,
1750 [TransactionValidationOutcome::Valid {
1751 balance: U256::from(1_000),
1752 state_nonce: 0,
1753 bytecode_hash: None,
1754 transaction: ValidTransaction::Valid(tx),
1755 propagate: true,
1756 authorities: Some(vec![auth]),
1757 }],
1758 );
1759
1760 let identifiers = test_pool.identifiers.read();
1761 assert_eq!(identifiers.sender_id(&auth), Some(SenderId::from(1)));
1762 }
1763
1764 #[test]
1765 fn sender_queries_do_not_allocate_ids_for_unknown_addresses() {
1766 let test_pool = &TestPoolBuilder::default().with_config(Default::default()).pool;
1767 let sender = Address::new([9; 20]);
1768
1769 assert_eq!(test_pool.sender_id(&sender), None);
1770 assert!(test_pool.get_transactions_by_sender(sender).is_empty());
1771 assert!(test_pool.get_pending_transaction_by_sender_and_nonce(sender, 0).is_none());
1772 assert!(test_pool.get_queued_transactions_by_sender(sender).is_empty());
1773 assert!(test_pool.get_pending_transactions_by_sender(sender).is_empty());
1774 assert!(test_pool.get_highest_transaction_by_sender(sender).is_none());
1775 assert!(test_pool.get_highest_consecutive_transaction_by_sender(sender, 0).is_none());
1776 assert!(test_pool.remove_transactions_by_sender(sender).is_empty());
1777 assert_eq!(test_pool.sender_id(&sender), None);
1778 }
1779}