1use crate::{
69 blobstore::BlobStore,
70 error::{PoolError, PoolErrorKind, PoolResult},
71 identifier::{SenderId, SenderIdentifiers, TransactionId},
72 metrics::BlobStoreMetrics,
73 pool::{
74 listener::{
75 BlobTransactionSidecarListener, PendingTransactionHashListener, PoolEventBroadcast,
76 TransactionListener,
77 },
78 state::SubPool,
79 txpool::{SenderInfo, TxPool},
80 update::UpdateOutcome,
81 },
82 traits::{
83 AllPoolTransactions, BestTransactionsAttributes, BlockInfo, GetPooledTransactionLimit,
84 NewBlobSidecar, PoolSize, PoolTransaction, PropagatedTransactions, TransactionOrigin,
85 },
86 validate::{TransactionValidationOutcome, ValidPoolTransaction, ValidTransaction},
87 CanonicalStateUpdate, EthPoolTransaction, PoolConfig, TransactionOrdering,
88 TransactionValidator,
89};
90
91use alloy_primitives::{
92 map::{AddressSet, HashSet},
93 Address, TxHash, B256,
94};
95use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
96use reth_eth_wire_types::HandleMempoolData;
97use reth_execution_types::ChangedAccount;
98
99use alloy_eips::{eip7594::BlobTransactionSidecarVariant, Typed2718};
100use reth_primitives_traits::Recovered;
101use rustc_hash::FxHashMap;
102use std::{
103 fmt,
104 sync::{
105 atomic::{AtomicBool, Ordering},
106 Arc,
107 },
108 time::Instant,
109};
110use tokio::sync::mpsc;
111use tracing::{debug, trace, warn};
112mod events;
113pub use best::{BestTransactionFilter, BestTransactionsWithPrioritizedSenders};
114pub use blob::{blob_tx_priority, fee_delta, BlobOrd, BlobTransactions};
115pub use events::{FullTransactionEvent, NewTransactionEvent, TransactionEvent};
116pub use listener::{AllTransactionsEvents, TransactionEvents, TransactionListenerKind};
117pub use parked::{BasefeeOrd, ParkedOrd, ParkedPool, QueuedOrd};
118pub use pending::PendingPool;
119
120mod best;
121pub use best::BestTransactions;
122
123mod blob;
124pub mod listener;
125mod parked;
126pub mod pending;
127pub mod size;
128pub(crate) mod state;
129pub mod txpool;
130mod update;
131
132pub const PENDING_TX_LISTENER_BUFFER_SIZE: usize = 2048;
134pub const NEW_TX_LISTENER_BUFFER_SIZE: usize = 1024;
136
137const BLOB_SIDECAR_LISTENER_BUFFER_SIZE: usize = 512;
138
139pub struct PoolInner<V, T, S>
141where
142 T: TransactionOrdering,
143{
144 identifiers: RwLock<SenderIdentifiers>,
146 validator: V,
148 blob_store: S,
150 pool: RwLock<TxPool<T>>,
152 config: PoolConfig,
154 event_listener: RwLock<PoolEventBroadcast<T::Transaction>>,
156 has_event_listeners: AtomicBool,
158 pending_transaction_listener: RwLock<Vec<PendingTransactionHashListener>>,
160 transaction_listener: RwLock<Vec<TransactionListener<T::Transaction>>>,
162 blob_transaction_sidecar_listener: Mutex<Vec<BlobTransactionSidecarListener>>,
164 blob_store_metrics: BlobStoreMetrics,
166}
167
168impl<V, T, S> PoolInner<V, T, S>
171where
172 V: TransactionValidator,
173 T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
174 S: BlobStore,
175{
176 pub fn new(validator: V, ordering: T, blob_store: S, config: PoolConfig) -> Self {
178 Self {
179 identifiers: Default::default(),
180 validator,
181 event_listener: Default::default(),
182 has_event_listeners: AtomicBool::new(false),
183 pool: RwLock::new(TxPool::new(ordering, config.clone())),
184 pending_transaction_listener: Default::default(),
185 transaction_listener: Default::default(),
186 blob_transaction_sidecar_listener: Default::default(),
187 config,
188 blob_store,
189 blob_store_metrics: Default::default(),
190 }
191 }
192
193 pub const fn blob_store(&self) -> &S {
195 &self.blob_store
196 }
197
198 pub fn size(&self) -> PoolSize {
200 self.get_pool_data().size()
201 }
202
203 pub fn block_info(&self) -> BlockInfo {
205 self.get_pool_data().block_info()
206 }
207 pub fn set_block_info(&self, info: BlockInfo) {
212 let outcome = self.pool.write().set_block_info(info);
213
214 self.notify_on_transaction_updates(outcome.promoted, outcome.discarded);
216 }
217
218 pub fn get_sender_id(&self, addr: Address) -> SenderId {
225 self.identifiers.write().sender_id_or_create(addr)
226 }
227
228 pub fn sender_id(&self, addr: &Address) -> Option<SenderId> {
233 self.identifiers.read().sender_id(addr)
234 }
235
236 pub fn get_sender_ids(&self, addrs: impl IntoIterator<Item = Address>) -> Vec<SenderId> {
238 self.identifiers.write().sender_ids_or_create(addrs)
239 }
240
241 pub fn unique_senders(&self) -> AddressSet {
243 self.get_pool_data().unique_senders()
244 }
245
246 fn changed_senders(
249 &self,
250 accs: impl Iterator<Item = ChangedAccount>,
251 ) -> FxHashMap<SenderId, SenderInfo> {
252 let identifiers = self.identifiers.read();
253 accs.into_iter()
254 .filter_map(|acc| {
255 let ChangedAccount { address, nonce, balance } = acc;
256 let sender_id = identifiers.sender_id(&address)?;
257 Some((sender_id, SenderInfo { state_nonce: nonce, balance }))
258 })
259 .collect()
260 }
261
262 pub const fn config(&self) -> &PoolConfig {
264 &self.config
265 }
266
267 pub const fn validator(&self) -> &V {
269 &self.validator
270 }
271
272 pub fn add_pending_listener(&self, kind: TransactionListenerKind) -> mpsc::Receiver<TxHash> {
275 let (sender, rx) = mpsc::channel(self.config.pending_tx_listener_buffer_size);
276 let listener = PendingTransactionHashListener { sender, kind };
277
278 let mut listeners = self.pending_transaction_listener.write();
279 listeners.retain(|l| !l.sender.is_closed());
281 listeners.push(listener);
282
283 rx
284 }
285
286 pub fn add_new_transaction_listener(
288 &self,
289 kind: TransactionListenerKind,
290 ) -> mpsc::Receiver<NewTransactionEvent<T::Transaction>> {
291 let (sender, rx) = mpsc::channel(self.config.new_tx_listener_buffer_size);
292 let listener = TransactionListener { sender, kind };
293
294 let mut listeners = self.transaction_listener.write();
295 listeners.retain(|l| !l.sender.is_closed());
297 listeners.push(listener);
298
299 rx
300 }
301 pub fn add_blob_sidecar_listener(&self) -> mpsc::Receiver<NewBlobSidecar> {
304 let (sender, rx) = mpsc::channel(BLOB_SIDECAR_LISTENER_BUFFER_SIZE);
305 let listener = BlobTransactionSidecarListener { sender };
306 self.blob_transaction_sidecar_listener.lock().push(listener);
307 rx
308 }
309
310 pub fn add_transaction_event_listener(&self, tx_hash: TxHash) -> Option<TransactionEvents> {
313 if !self.get_pool_data().contains(&tx_hash) {
314 return None
315 }
316 let mut listener = self.event_listener.write();
317 let events = listener.subscribe(tx_hash);
318 self.mark_event_listener_installed();
319 Some(events)
320 }
321
322 pub fn add_all_transactions_event_listener(&self) -> AllTransactionsEvents<T::Transaction> {
324 let mut listener = self.event_listener.write();
325 let events = listener.subscribe_all();
326 self.mark_event_listener_installed();
327 events
328 }
329
330 #[inline]
331 fn has_event_listeners(&self) -> bool {
332 self.has_event_listeners.load(Ordering::Relaxed)
333 }
334
335 #[inline]
336 fn mark_event_listener_installed(&self) {
337 self.has_event_listeners.store(true, Ordering::Relaxed);
338 }
339
340 #[inline]
341 fn update_event_listener_state(&self, listener: &PoolEventBroadcast<T::Transaction>) {
342 if listener.is_empty() {
343 self.has_event_listeners.store(false, Ordering::Relaxed);
344 }
345 }
346
347 #[inline]
348 fn with_event_listener<F>(&self, emit: F)
349 where
350 F: FnOnce(&mut PoolEventBroadcast<T::Transaction>),
351 {
352 if !self.has_event_listeners() {
353 return
354 }
355 let mut listener = self.event_listener.write();
356 if !listener.is_empty() {
357 emit(&mut listener);
358 }
359 self.update_event_listener_state(&listener);
360 }
361
362 pub fn get_pool_data(&self) -> RwLockReadGuard<'_, TxPool<T>> {
364 self.pool.read()
365 }
366
367 pub fn pooled_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
369 let mut out = Vec::new();
370 self.append_pooled_transactions(&mut out);
371 out
372 }
373
374 pub fn pooled_transactions_hashes(&self) -> Vec<TxHash> {
376 let mut out = Vec::new();
377 self.append_pooled_transactions_hashes(&mut out);
378 out
379 }
380
381 pub fn pooled_transactions_max(
383 &self,
384 max: usize,
385 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
386 if max == 0 {
387 return Vec::new()
388 }
389
390 let pool = self.get_pool_data();
391 let mut out = Vec::with_capacity(max.min(pool.all().len()));
392 out.extend(pool.all().transactions_iter().filter(|tx| tx.propagate).take(max).cloned());
393 out
394 }
395
396 pub fn append_pooled_transactions(
398 &self,
399 out: &mut Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
400 ) {
401 out.extend(
402 self.get_pool_data().all().transactions_iter().filter(|tx| tx.propagate).cloned(),
403 );
404 }
405
406 pub fn append_pooled_transaction_elements(
409 &self,
410 tx_hashes: &[TxHash],
411 limit: GetPooledTransactionLimit,
412 out: &mut Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>,
413 ) where
414 <V as TransactionValidator>::Transaction: EthPoolTransaction,
415 {
416 let transactions = self.get_all_propagatable(tx_hashes);
417 let mut size = 0;
418 for transaction in transactions {
419 let encoded_len = transaction.encoded_length();
420 let Some(pooled) = self.to_pooled_transaction(transaction) else {
421 continue;
422 };
423
424 size += encoded_len;
425 out.push(pooled.into_inner());
426
427 if limit.exceeds(size) {
428 break
429 }
430 }
431 }
432
433 pub fn append_pooled_transactions_hashes(&self, out: &mut Vec<TxHash>) {
436 out.extend(
437 self.get_pool_data()
438 .all()
439 .transactions_iter()
440 .filter(|tx| tx.propagate)
441 .map(|tx| *tx.hash()),
442 );
443 }
444
445 pub fn append_pooled_transactions_max(
448 &self,
449 max: usize,
450 out: &mut Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
451 ) {
452 out.extend(
453 self.get_pool_data()
454 .all()
455 .transactions_iter()
456 .filter(|tx| tx.propagate)
457 .take(max)
458 .cloned(),
459 );
460 }
461
462 pub fn pooled_transactions_hashes_max(&self, max: usize) -> Vec<TxHash> {
464 if max == 0 {
465 return Vec::new();
466 }
467
468 let pool = self.get_pool_data();
469 let mut out = Vec::with_capacity(max.min(pool.all().len()));
470 out.extend(
471 pool.all().transactions_iter().filter(|tx| tx.propagate).take(max).map(|tx| *tx.hash()),
472 );
473 out
474 }
475
476 fn to_pooled_transaction(
481 &self,
482 transaction: Arc<ValidPoolTransaction<T::Transaction>>,
483 ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
484 where
485 <V as TransactionValidator>::Transaction: EthPoolTransaction,
486 {
487 if transaction.is_eip4844() {
488 let sidecar = self.blob_store.get(*transaction.hash()).ok()??;
489 transaction.transaction.clone().try_into_pooled_eip4844(sidecar)
490 } else {
491 transaction
492 .transaction
493 .clone_into_pooled()
494 .inspect_err(|err| {
495 debug!(
496 target: "txpool", %err,
497 "failed to convert transaction to pooled element; skipping",
498 );
499 })
500 .ok()
501 }
502 }
503
504 pub fn get_pooled_transaction_elements(
507 &self,
508 tx_hashes: Vec<TxHash>,
509 limit: GetPooledTransactionLimit,
510 ) -> Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>
511 where
512 <V as TransactionValidator>::Transaction: EthPoolTransaction,
513 {
514 let mut elements = Vec::new();
515 self.append_pooled_transaction_elements(&tx_hashes, limit, &mut elements);
516 elements.shrink_to_fit();
517 elements
518 }
519
520 pub fn get_pooled_transaction_element(
522 &self,
523 tx_hash: TxHash,
524 ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
525 where
526 <V as TransactionValidator>::Transaction: EthPoolTransaction,
527 {
528 self.get(&tx_hash).and_then(|tx| self.to_pooled_transaction(tx))
529 }
530
531 pub fn on_canonical_state_change(&self, update: CanonicalStateUpdate<'_, V::Block>) {
533 trace!(target: "txpool", ?update, "updating pool on canonical state change");
534
535 let block_info = update.block_info();
536 let CanonicalStateUpdate {
537 new_tip, changed_accounts, mined_transactions, update_kind, ..
538 } = update;
539 self.validator.on_new_head_block(new_tip);
540
541 let changed_senders = self.changed_senders(changed_accounts.into_iter());
542
543 let outcome = self.pool.write().on_canonical_state_change(
545 block_info,
546 mined_transactions,
547 changed_senders,
548 update_kind,
549 );
550
551 self.delete_discarded_blobs(outcome.discarded.iter());
553
554 self.notify_on_new_state(outcome);
556 }
557
558 pub fn update_accounts(&self, accounts: Vec<ChangedAccount>) {
564 let changed_senders = self.changed_senders(accounts.into_iter());
565 let UpdateOutcome { promoted, discarded } =
566 self.pool.write().update_accounts(changed_senders);
567
568 self.notify_on_transaction_updates(promoted, discarded);
569 }
570
571 fn add_transaction(
579 &self,
580 pool: &mut RwLockWriteGuard<'_, TxPool<T>>,
581 origin: TransactionOrigin,
582 tx: TransactionValidationOutcome<T::Transaction>,
583 ) -> (PoolResult<AddedTransactionOutcome>, Option<AddedTransactionMeta<T::Transaction>>) {
584 match tx {
585 TransactionValidationOutcome::Valid {
586 balance,
587 state_nonce,
588 transaction,
589 propagate,
590 bytecode_hash,
591 authorities,
592 } => {
593 let sender_id = self.get_sender_id(transaction.sender());
594 let transaction_id = TransactionId::new(sender_id, transaction.nonce());
595
596 let (transaction, blob_sidecar) = match transaction {
598 ValidTransaction::Valid(tx) => (tx, None),
599 ValidTransaction::ValidWithSidecar { transaction, sidecar } => {
600 debug_assert!(
601 transaction.is_eip4844(),
602 "validator returned sidecar for non EIP-4844 transaction"
603 );
604 (transaction, Some(sidecar))
605 }
606 };
607
608 let tx = ValidPoolTransaction {
609 transaction,
610 transaction_id,
611 propagate,
612 timestamp: Instant::now(),
613 origin,
614 authority_ids: authorities.map(|auths| self.get_sender_ids(auths)),
615 };
616
617 let added = match pool.add_transaction(tx, balance, state_nonce, bytecode_hash) {
618 Ok(added) => added,
619 Err(err) => return (Err(err), None),
620 };
621 let hash = *added.hash();
622 let state = added.transaction_state();
623
624 let meta = AddedTransactionMeta { added, blob_sidecar };
625
626 (Ok(AddedTransactionOutcome { hash, state }), Some(meta))
627 }
628 TransactionValidationOutcome::Invalid(tx, err) => {
629 self.with_event_listener(|listener| listener.invalid(tx.hash()));
630 (Err(PoolError::new(*tx.hash(), err)), None)
631 }
632 TransactionValidationOutcome::Error(tx_hash, err) => {
633 self.with_event_listener(|listener| listener.discarded(&tx_hash));
634 (Err(PoolError::other(tx_hash, err)), None)
635 }
636 }
637 }
638
639 pub fn add_transaction_and_subscribe(
641 &self,
642 origin: TransactionOrigin,
643 tx: TransactionValidationOutcome<T::Transaction>,
644 ) -> PoolResult<TransactionEvents> {
645 let listener = {
646 let mut listener = self.event_listener.write();
647 let events = listener.subscribe(tx.tx_hash());
648 self.mark_event_listener_installed();
649 events
650 };
651 let mut results = self.add_transactions(origin, std::iter::once(tx));
652 results.pop().expect("result length is the same as the input")?;
653 Ok(listener)
654 }
655
656 pub fn add_transactions(
661 &self,
662 origin: TransactionOrigin,
663 transactions: impl IntoIterator<Item = TransactionValidationOutcome<T::Transaction>>,
664 ) -> Vec<PoolResult<AddedTransactionOutcome>> {
665 self.add_transactions_with_origins(transactions.into_iter().map(|tx| (origin, tx)))
666 }
667
668 pub fn add_transactions_with_origins(
671 &self,
672 transactions: impl IntoIterator<
673 Item = (TransactionOrigin, TransactionValidationOutcome<T::Transaction>),
674 >,
675 ) -> Vec<PoolResult<AddedTransactionOutcome>> {
676 let (mut results, added_metas, discarded) = {
678 let mut pool = self.pool.write();
679 let mut added_metas = Vec::new();
680
681 let results = transactions
682 .into_iter()
683 .map(|(origin, tx)| {
684 let (result, meta) = self.add_transaction(&mut pool, origin, tx);
685
686 if result.is_ok() &&
688 let Some(meta) = meta
689 {
690 added_metas.push(meta);
691 }
692
693 result
694 })
695 .collect::<Vec<_>>();
696
697 let discarded = if results.iter().any(Result::is_ok) {
699 let discarded = pool.discard_worst();
700 pool.update_size_metrics();
701 discarded
702 } else {
703 Default::default()
704 };
705
706 (results, added_metas, discarded)
707 };
708
709 for meta in added_metas {
710 self.on_added_transaction(meta);
711 }
712
713 if !discarded.is_empty() {
714 self.delete_discarded_blobs(discarded.iter());
716 self.with_event_listener(|listener| listener.discarded_many(&discarded));
717
718 let discarded_hashes =
719 discarded.into_iter().map(|tx| *tx.hash()).collect::<HashSet<_>>();
720
721 for res in &mut results {
724 if let Ok(AddedTransactionOutcome { hash, .. }) = res &&
725 discarded_hashes.contains(hash)
726 {
727 *res = Err(PoolError::new(*hash, PoolErrorKind::DiscardedOnInsert))
728 }
729 }
730 };
731
732 results
733 }
734
735 fn on_added_transaction(&self, meta: AddedTransactionMeta<T::Transaction>) {
740 if let Some(sidecar) = meta.blob_sidecar {
742 let hash = *meta.added.hash();
743 self.on_new_blob_sidecar(&hash, &sidecar);
744 self.insert_blob(hash, sidecar);
745 }
746
747 if let Some(replaced) = meta.added.replaced_blob_transaction() {
749 debug!(target: "txpool", "[{:?}] delete replaced blob sidecar", replaced);
750 self.delete_blob(replaced);
751 }
752
753 if let Some(discarded) = meta.added.discarded_transactions() {
755 self.delete_discarded_blobs(discarded.iter());
756 }
757
758 if let Some(pending) = meta.added.as_pending() {
760 self.on_new_pending_transaction(pending);
761 }
762
763 self.notify_event_listeners(&meta.added);
765
766 self.on_new_transaction(meta.added.into_new_transaction_event());
768 }
769
770 pub fn on_new_pending_transaction(&self, pending: &AddedPendingTransaction<T::Transaction>) {
779 let mut needs_cleanup = false;
780
781 {
782 let listeners = self.pending_transaction_listener.read();
783 for listener in listeners.iter() {
784 if !listener.send_all(pending.pending_transactions(listener.kind)) {
785 needs_cleanup = true;
786 }
787 }
788 }
789
790 if needs_cleanup {
792 self.pending_transaction_listener
793 .write()
794 .retain(|listener| !listener.sender.is_closed());
795 }
796 }
797
798 pub fn on_new_transaction(&self, event: NewTransactionEvent<T::Transaction>) {
807 let mut needs_cleanup = false;
808
809 {
810 let listeners = self.transaction_listener.read();
811 for listener in listeners.iter() {
812 if listener.kind.is_propagate_only() && !event.transaction.propagate {
813 if listener.sender.is_closed() {
814 needs_cleanup = true;
815 }
816 continue
818 }
819
820 if !listener.send(event.clone()) {
821 needs_cleanup = true;
822 }
823 }
824 }
825
826 if needs_cleanup {
828 self.transaction_listener.write().retain(|listener| !listener.sender.is_closed());
829 }
830 }
831
832 fn on_new_blob_sidecar(&self, tx_hash: &TxHash, sidecar: &BlobTransactionSidecarVariant) {
834 let mut sidecar_listeners = self.blob_transaction_sidecar_listener.lock();
835 if sidecar_listeners.is_empty() {
836 return
837 }
838 let sidecar = Arc::new(sidecar.clone());
839 sidecar_listeners.retain_mut(|listener| {
840 let new_blob_event = NewBlobSidecar { tx_hash: *tx_hash, sidecar: sidecar.clone() };
841 match listener.sender.try_send(new_blob_event) {
842 Ok(()) => true,
843 Err(err) => {
844 if matches!(err, mpsc::error::TrySendError::Full(_)) {
845 debug!(
846 target: "txpool",
847 "[{:?}] failed to send blob sidecar; channel full",
848 sidecar,
849 );
850 true
851 } else {
852 false
853 }
854 }
855 }
856 })
857 }
858
859 fn notify_on_new_state(&self, outcome: OnNewCanonicalStateOutcome<T::Transaction>) {
861 trace!(target: "txpool", promoted=outcome.promoted.len(), discarded= outcome.discarded.len() ,"notifying listeners on state change");
862
863 let mut needs_pending_cleanup = false;
865 {
866 let listeners = self.pending_transaction_listener.read();
867 for listener in listeners.iter() {
868 if !listener.send_all(outcome.pending_transactions(listener.kind)) {
869 needs_pending_cleanup = true;
870 }
871 }
872 }
873 if needs_pending_cleanup {
874 self.pending_transaction_listener.write().retain(|l| !l.sender.is_closed());
875 }
876
877 let mut needs_tx_cleanup = false;
879 {
880 let listeners = self.transaction_listener.read();
881 for listener in listeners.iter() {
882 if !listener.send_all(outcome.full_pending_transactions(listener.kind)) {
883 needs_tx_cleanup = true;
884 }
885 }
886 }
887 if needs_tx_cleanup {
888 self.transaction_listener.write().retain(|l| !l.sender.is_closed());
889 }
890
891 let OnNewCanonicalStateOutcome { mined, promoted, discarded, block_hash } = outcome;
892
893 self.with_event_listener(|listener| {
895 for tx in &mined {
896 listener.mined(tx, block_hash);
897 }
898 for tx in &promoted {
899 listener.pending(tx.hash(), None);
900 }
901 for tx in &discarded {
902 listener.discarded(tx.hash());
903 }
904 })
905 }
906
907 pub fn notify_on_transaction_updates(
916 &self,
917 promoted: Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
918 discarded: Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
919 ) {
920 if !promoted.is_empty() {
922 let mut needs_pending_cleanup = false;
923 {
924 let listeners = self.pending_transaction_listener.read();
925 for listener in listeners.iter() {
926 let promoted_hashes = promoted.iter().filter_map(|tx| {
927 if listener.kind.is_propagate_only() && !tx.propagate {
928 None
929 } else {
930 Some(*tx.hash())
931 }
932 });
933 if !listener.send_all(promoted_hashes) {
934 needs_pending_cleanup = true;
935 }
936 }
937 }
938 if needs_pending_cleanup {
939 self.pending_transaction_listener.write().retain(|l| !l.sender.is_closed());
940 }
941
942 let mut needs_tx_cleanup = false;
944 {
945 let listeners = self.transaction_listener.read();
946 for listener in listeners.iter() {
947 let promoted_txs = promoted.iter().filter_map(|tx| {
948 if listener.kind.is_propagate_only() && !tx.propagate {
949 None
950 } else {
951 Some(NewTransactionEvent::pending(tx.clone()))
952 }
953 });
954 if !listener.send_all(promoted_txs) {
955 needs_tx_cleanup = true;
956 }
957 }
958 }
959 if needs_tx_cleanup {
960 self.transaction_listener.write().retain(|l| !l.sender.is_closed());
961 }
962 }
963
964 self.with_event_listener(|listener| {
965 for tx in &promoted {
966 listener.pending(tx.hash(), None);
967 }
968 for tx in &discarded {
969 listener.discarded(tx.hash());
970 }
971 });
972
973 if !discarded.is_empty() {
974 self.delete_discarded_blobs(discarded.iter());
977 }
978 }
979
980 pub fn notify_event_listeners(&self, tx: &AddedTransaction<T::Transaction>) {
989 self.with_event_listener(|listener| match tx {
990 AddedTransaction::Pending(tx) => {
991 let AddedPendingTransaction { transaction, promoted, discarded, replaced } = tx;
992
993 listener.pending(transaction.hash(), replaced.clone());
994 for tx in promoted {
995 listener.pending(tx.hash(), None);
996 }
997 for tx in discarded {
998 listener.discarded(tx.hash());
999 }
1000 }
1001 AddedTransaction::Parked { transaction, replaced, queued_reason, .. } => {
1002 listener.queued(transaction.hash(), queued_reason.clone());
1003 if let Some(replaced) = replaced {
1004 listener.replaced(replaced.clone(), *transaction.hash());
1005 }
1006 }
1007 });
1008 }
1009
1010 pub fn best_transactions(&self) -> BestTransactions<T> {
1012 self.get_pool_data().best_transactions()
1013 }
1014
1015 pub fn best_transactions_with_attributes(
1018 &self,
1019 best_transactions_attributes: BestTransactionsAttributes,
1020 ) -> Box<dyn crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T::Transaction>>>>
1021 {
1022 self.get_pool_data().best_transactions_with_attributes(best_transactions_attributes)
1023 }
1024
1025 pub fn pending_transactions_max(
1027 &self,
1028 max: usize,
1029 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1030 self.get_pool_data().pending_transactions_iter().take(max).collect()
1031 }
1032
1033 pub fn pending_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1035 self.get_pool_data().pending_transactions()
1036 }
1037
1038 pub fn queued_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1040 self.get_pool_data().queued_transactions()
1041 }
1042
1043 pub fn all_transactions(&self) -> AllPoolTransactions<T::Transaction> {
1045 let pool = self.get_pool_data();
1046 AllPoolTransactions {
1047 pending: pool.pending_transactions(),
1048 queued: pool.queued_transactions(),
1049 }
1050 }
1051
1052 pub fn all_transaction_hashes(&self) -> Vec<TxHash> {
1054 self.get_pool_data().all().transactions_iter().map(|tx| *tx.hash()).collect()
1055 }
1056
1057 pub fn remove_transactions(
1062 &self,
1063 hashes: Vec<TxHash>,
1064 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1065 if hashes.is_empty() {
1066 return Vec::new()
1067 }
1068 let removed = self.pool.write().remove_transactions(hashes);
1069
1070 self.with_event_listener(|listener| listener.discarded_many(&removed));
1071
1072 removed
1073 }
1074
1075 pub fn remove_transactions_and_descendants(
1078 &self,
1079 hashes: Vec<TxHash>,
1080 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1081 if hashes.is_empty() {
1082 return Vec::new()
1083 }
1084 let removed = self.pool.write().remove_transactions_and_descendants(hashes);
1085
1086 self.with_event_listener(|listener| {
1087 for tx in &removed {
1088 listener.discarded(tx.hash());
1089 }
1090 });
1091
1092 removed
1093 }
1094
1095 pub fn remove_transactions_by_sender(
1097 &self,
1098 sender: Address,
1099 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1100 let Some(sender_id) = self.sender_id(&sender) else { return Vec::new() };
1101 let removed = self.pool.write().remove_transactions_by_sender(sender_id);
1102
1103 self.with_event_listener(|listener| listener.discarded_many(&removed));
1104
1105 removed
1106 }
1107
1108 pub fn prune_transactions(
1113 &self,
1114 hashes: Vec<TxHash>,
1115 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1116 if hashes.is_empty() {
1117 return Vec::new()
1118 }
1119
1120 self.pool.write().prune_transactions(hashes)
1121 }
1122
1123 pub fn retain_unknown<A>(&self, announcement: &mut A)
1125 where
1126 A: HandleMempoolData,
1127 {
1128 if announcement.is_empty() {
1129 return
1130 }
1131 let pool = self.get_pool_data();
1132 announcement.retain_by_hash(|tx| !pool.contains(tx))
1133 }
1134
1135 pub fn retain_contains<A>(&self, announcement: &mut A)
1137 where
1138 A: HandleMempoolData,
1139 {
1140 if announcement.is_empty() {
1141 return
1142 }
1143 let pool = self.get_pool_data();
1144 announcement.retain_by_hash(|tx| pool.contains(tx))
1145 }
1146
1147 pub fn get(&self, tx_hash: &TxHash) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1149 self.get_pool_data().get(tx_hash)
1150 }
1151
1152 pub fn get_transactions_by_sender(
1154 &self,
1155 sender: Address,
1156 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1157 let Some(sender_id) = self.sender_id(&sender) else { return Vec::new() };
1158 self.get_pool_data().get_transactions_by_sender(sender_id)
1159 }
1160
1161 pub fn get_pending_transaction_by_sender_and_nonce(
1163 &self,
1164 sender: Address,
1165 nonce: u64,
1166 ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1167 let sender_id = self.sender_id(&sender)?;
1168 self.get_pool_data().get_pending_transaction_by_sender_and_nonce(sender_id, nonce)
1169 }
1170
1171 pub fn get_queued_transactions_by_sender(
1173 &self,
1174 sender: Address,
1175 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1176 let Some(sender_id) = self.sender_id(&sender) else { return Vec::new() };
1177 self.get_pool_data().queued_txs_by_sender(sender_id)
1178 }
1179
1180 pub fn pending_transactions_with_predicate(
1182 &self,
1183 predicate: impl FnMut(&ValidPoolTransaction<T::Transaction>) -> bool,
1184 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1185 self.get_pool_data().pending_transactions_with_predicate(predicate)
1186 }
1187
1188 pub fn get_pending_transactions_by_sender(
1190 &self,
1191 sender: Address,
1192 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1193 let Some(sender_id) = self.sender_id(&sender) else { return Vec::new() };
1194 self.get_pool_data().pending_txs_by_sender(sender_id)
1195 }
1196
1197 pub fn get_highest_transaction_by_sender(
1199 &self,
1200 sender: Address,
1201 ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1202 let sender_id = self.sender_id(&sender)?;
1203 self.get_pool_data().get_highest_transaction_by_sender(sender_id)
1204 }
1205
1206 pub fn get_highest_consecutive_transaction_by_sender(
1208 &self,
1209 sender: Address,
1210 on_chain_nonce: u64,
1211 ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1212 let sender_id = self.sender_id(&sender)?;
1213 self.get_pool_data().get_highest_consecutive_transaction_by_sender(
1214 sender_id.into_transaction_id(on_chain_nonce),
1215 )
1216 }
1217
1218 pub fn get_transaction_by_transaction_id(
1220 &self,
1221 transaction_id: &TransactionId,
1222 ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1223 self.get_pool_data().all().get(transaction_id).map(|tx| tx.transaction.clone())
1224 }
1225
1226 pub fn get_transactions_by_origin(
1228 &self,
1229 origin: TransactionOrigin,
1230 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1231 self.get_pool_data()
1232 .all()
1233 .transactions_iter()
1234 .filter(|tx| tx.origin == origin)
1235 .cloned()
1236 .collect()
1237 }
1238
1239 pub fn get_pending_transactions_by_origin(
1241 &self,
1242 origin: TransactionOrigin,
1243 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1244 self.get_pool_data().pending_transactions_iter().filter(|tx| tx.origin == origin).collect()
1245 }
1246
1247 pub fn get_all(&self, txs: Vec<TxHash>) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1251 if txs.is_empty() {
1252 return Vec::new()
1253 }
1254 self.get_pool_data().get_all(txs).collect()
1255 }
1256
1257 fn get_all_propagatable(
1261 &self,
1262 txs: &[TxHash],
1263 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1264 if txs.is_empty() {
1265 return Vec::new()
1266 }
1267 let pool = self.get_pool_data();
1268 txs.iter().filter_map(|tx| pool.get(tx).filter(|tx| tx.propagate)).collect()
1269 }
1270
1271 pub fn on_propagated(&self, txs: PropagatedTransactions) {
1273 if txs.is_empty() {
1274 return
1275 }
1276 self.with_event_listener(|listener| {
1277 txs.into_iter().for_each(|(hash, peers)| listener.propagated(&hash, peers));
1278 });
1279 }
1280
1281 pub fn len(&self) -> usize {
1283 self.get_pool_data().len()
1284 }
1285
1286 pub fn is_empty(&self) -> bool {
1288 self.get_pool_data().is_empty()
1289 }
1290
1291 pub fn is_exceeded(&self) -> bool {
1293 self.pool.read().is_exceeded()
1294 }
1295
1296 fn insert_blob(&self, hash: TxHash, blob: BlobTransactionSidecarVariant) {
1298 debug!(target: "txpool", "[{:?}] storing blob sidecar", hash);
1299 if let Err(err) = self.blob_store.insert(hash, blob) {
1300 warn!(target: "txpool", %err, "[{:?}] failed to insert blob", hash);
1301 self.blob_store_metrics.blobstore_failed_inserts.increment(1);
1302 }
1303 self.update_blob_store_metrics();
1304 }
1305
1306 pub fn delete_blob(&self, blob: TxHash) {
1308 let _ = self.blob_store.delete(blob);
1309 }
1310
1311 pub fn delete_blobs(&self, txs: Vec<TxHash>) {
1313 let _ = self.blob_store.delete_all(txs);
1314 }
1315
1316 pub fn cleanup_blobs(&self) {
1318 let stat = self.blob_store.cleanup();
1319 self.blob_store_metrics.blobstore_failed_deletes.increment(stat.delete_failed as u64);
1320 self.update_blob_store_metrics();
1321 }
1322
1323 fn update_blob_store_metrics(&self) {
1324 if let Some(data_size) = self.blob_store.data_size_hint() {
1325 self.blob_store_metrics.blobstore_byte_size.set(data_size as f64);
1326 }
1327 self.blob_store_metrics.blobstore_entries.set(self.blob_store.blobs_len() as f64);
1328 }
1329
1330 fn delete_discarded_blobs<'a>(
1332 &'a self,
1333 transactions: impl IntoIterator<Item = &'a Arc<ValidPoolTransaction<T::Transaction>>>,
1334 ) {
1335 let blob_txs = transactions
1336 .into_iter()
1337 .filter(|tx| tx.transaction.is_eip4844())
1338 .map(|tx| *tx.hash())
1339 .collect();
1340 self.delete_blobs(blob_txs);
1341 }
1342}
1343
1344impl<V, T: TransactionOrdering, S> fmt::Debug for PoolInner<V, T, S> {
1345 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1346 f.debug_struct("PoolInner").field("config", &self.config).finish_non_exhaustive()
1347 }
1348}
1349
1350#[derive(Debug)]
1355struct AddedTransactionMeta<T: PoolTransaction> {
1356 added: AddedTransaction<T>,
1358 blob_sidecar: Option<BlobTransactionSidecarVariant>,
1360}
1361
1362#[derive(Debug, Clone)]
1364pub struct AddedPendingTransaction<T: PoolTransaction> {
1365 pub transaction: Arc<ValidPoolTransaction<T>>,
1367 pub replaced: Option<Arc<ValidPoolTransaction<T>>>,
1369 pub promoted: Vec<Arc<ValidPoolTransaction<T>>>,
1371 pub discarded: Vec<Arc<ValidPoolTransaction<T>>>,
1373}
1374
1375impl<T: PoolTransaction> AddedPendingTransaction<T> {
1376 pub(crate) fn pending_transactions(
1382 &self,
1383 kind: TransactionListenerKind,
1384 ) -> impl Iterator<Item = B256> + '_ {
1385 let iter = std::iter::once(&self.transaction).chain(self.promoted.iter());
1386 PendingTransactionIter { kind, iter }
1387 }
1388}
1389
1390pub(crate) struct PendingTransactionIter<Iter> {
1391 kind: TransactionListenerKind,
1392 iter: Iter,
1393}
1394
1395impl<'a, Iter, T> Iterator for PendingTransactionIter<Iter>
1396where
1397 Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1398 T: PoolTransaction + 'a,
1399{
1400 type Item = B256;
1401
1402 fn next(&mut self) -> Option<Self::Item> {
1403 loop {
1404 let next = self.iter.next()?;
1405 if self.kind.is_propagate_only() && !next.propagate {
1406 continue
1407 }
1408 return Some(*next.hash())
1409 }
1410 }
1411}
1412
1413pub(crate) struct FullPendingTransactionIter<Iter> {
1415 kind: TransactionListenerKind,
1416 iter: Iter,
1417}
1418
1419impl<'a, Iter, T> Iterator for FullPendingTransactionIter<Iter>
1420where
1421 Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1422 T: PoolTransaction + 'a,
1423{
1424 type Item = NewTransactionEvent<T>;
1425
1426 fn next(&mut self) -> Option<Self::Item> {
1427 loop {
1428 let next = self.iter.next()?;
1429 if self.kind.is_propagate_only() && !next.propagate {
1430 continue
1431 }
1432 return Some(NewTransactionEvent {
1433 subpool: SubPool::Pending,
1434 transaction: next.clone(),
1435 })
1436 }
1437 }
1438}
1439
1440#[derive(Debug, Clone)]
1442pub enum AddedTransaction<T: PoolTransaction> {
1443 Pending(AddedPendingTransaction<T>),
1445 Parked {
1448 transaction: Arc<ValidPoolTransaction<T>>,
1450 replaced: Option<Arc<ValidPoolTransaction<T>>>,
1452 subpool: SubPool,
1454 queued_reason: Option<QueuedReason>,
1456 },
1457}
1458
1459impl<T: PoolTransaction> AddedTransaction<T> {
1460 pub const fn as_pending(&self) -> Option<&AddedPendingTransaction<T>> {
1462 match self {
1463 Self::Pending(tx) => Some(tx),
1464 _ => None,
1465 }
1466 }
1467
1468 pub const fn replaced(&self) -> Option<&Arc<ValidPoolTransaction<T>>> {
1470 match self {
1471 Self::Pending(tx) => tx.replaced.as_ref(),
1472 Self::Parked { replaced, .. } => replaced.as_ref(),
1473 }
1474 }
1475
1476 pub(crate) fn discarded_transactions(&self) -> Option<&[Arc<ValidPoolTransaction<T>>]> {
1478 match self {
1479 Self::Pending(tx) => Some(&tx.discarded),
1480 Self::Parked { .. } => None,
1481 }
1482 }
1483
1484 pub(crate) fn replaced_blob_transaction(&self) -> Option<B256> {
1486 self.replaced().filter(|tx| tx.transaction.is_eip4844()).map(|tx| *tx.transaction.hash())
1487 }
1488
1489 pub fn hash(&self) -> &TxHash {
1491 match self {
1492 Self::Pending(tx) => tx.transaction.hash(),
1493 Self::Parked { transaction, .. } => transaction.hash(),
1494 }
1495 }
1496
1497 pub fn into_new_transaction_event(self) -> NewTransactionEvent<T> {
1499 match self {
1500 Self::Pending(tx) => {
1501 NewTransactionEvent { subpool: SubPool::Pending, transaction: tx.transaction }
1502 }
1503 Self::Parked { transaction, subpool, .. } => {
1504 NewTransactionEvent { transaction, subpool }
1505 }
1506 }
1507 }
1508
1509 pub(crate) const fn subpool(&self) -> SubPool {
1511 match self {
1512 Self::Pending(_) => SubPool::Pending,
1513 Self::Parked { subpool, .. } => *subpool,
1514 }
1515 }
1516
1517 #[cfg(test)]
1519 pub(crate) fn id(&self) -> &TransactionId {
1520 match self {
1521 Self::Pending(added) => added.transaction.id(),
1522 Self::Parked { transaction, .. } => transaction.id(),
1523 }
1524 }
1525
1526 pub const fn queued_reason(&self) -> Option<&QueuedReason> {
1528 match self {
1529 Self::Pending(_) => None,
1530 Self::Parked { queued_reason, .. } => queued_reason.as_ref(),
1531 }
1532 }
1533
1534 pub fn transaction_state(&self) -> AddedTransactionState {
1536 match self.subpool() {
1537 SubPool::Pending => AddedTransactionState::Pending,
1538 _ => {
1539 if let Some(reason) = self.queued_reason() {
1542 AddedTransactionState::Queued(reason.clone())
1543 } else {
1544 AddedTransactionState::Queued(QueuedReason::NonceGap)
1546 }
1547 }
1548 }
1549 }
1550}
1551
1552#[derive(Debug, Clone, PartialEq, Eq)]
1554pub enum QueuedReason {
1555 NonceGap,
1557 ParkedAncestors,
1559 InsufficientBalance,
1561 TooMuchGas,
1563 InsufficientBaseFee,
1565 InsufficientBlobFee,
1567}
1568
1569#[derive(Debug, Clone, PartialEq, Eq)]
1571pub enum AddedTransactionState {
1572 Pending,
1574 Queued(QueuedReason),
1576}
1577
1578impl AddedTransactionState {
1579 pub const fn is_queued(&self) -> bool {
1581 matches!(self, Self::Queued(_))
1582 }
1583
1584 pub const fn is_pending(&self) -> bool {
1586 matches!(self, Self::Pending)
1587 }
1588
1589 pub const fn queued_reason(&self) -> Option<&QueuedReason> {
1591 match self {
1592 Self::Queued(reason) => Some(reason),
1593 Self::Pending => None,
1594 }
1595 }
1596}
1597
1598#[derive(Debug, Clone, PartialEq, Eq)]
1600pub struct AddedTransactionOutcome {
1601 pub hash: TxHash,
1603 pub state: AddedTransactionState,
1605}
1606
1607impl AddedTransactionOutcome {
1608 pub const fn is_queued(&self) -> bool {
1610 self.state.is_queued()
1611 }
1612
1613 pub const fn is_pending(&self) -> bool {
1615 self.state.is_pending()
1616 }
1617}
1618
1619#[derive(Debug)]
1621pub(crate) struct OnNewCanonicalStateOutcome<T: PoolTransaction> {
1622 pub(crate) block_hash: B256,
1624 pub(crate) mined: Vec<TxHash>,
1626 pub(crate) promoted: Vec<Arc<ValidPoolTransaction<T>>>,
1628 pub(crate) discarded: Vec<Arc<ValidPoolTransaction<T>>>,
1630}
1631
1632impl<T: PoolTransaction> OnNewCanonicalStateOutcome<T> {
1633 pub(crate) fn pending_transactions(
1639 &self,
1640 kind: TransactionListenerKind,
1641 ) -> impl Iterator<Item = B256> + '_ {
1642 let iter = self.promoted.iter();
1643 PendingTransactionIter { kind, iter }
1644 }
1645
1646 pub(crate) fn full_pending_transactions(
1652 &self,
1653 kind: TransactionListenerKind,
1654 ) -> impl Iterator<Item = NewTransactionEvent<T>> + '_ {
1655 let iter = self.promoted.iter();
1656 FullPendingTransactionIter { kind, iter }
1657 }
1658}
1659
1660#[cfg(test)]
1661mod tests {
1662 use crate::{
1663 blobstore::{BlobStore, InMemoryBlobStore},
1664 identifier::SenderId,
1665 test_utils::{MockTransaction, TestPoolBuilder},
1666 validate::ValidTransaction,
1667 BlockInfo, PoolConfig, SubPoolLimit, TransactionOrigin, TransactionValidationOutcome, U256,
1668 };
1669 use alloy_eips::{eip4844::BlobTransactionSidecar, eip7594::BlobTransactionSidecarVariant};
1670 use alloy_primitives::Address;
1671 use std::{fs, path::PathBuf};
1672
1673 #[test]
1674 fn test_discard_blobs_on_blob_tx_eviction() {
1675 let blobs = {
1676 let json_content = fs::read_to_string(
1678 PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test_data/blob1.json"),
1679 )
1680 .expect("Failed to read the blob data file");
1681
1682 let json_value: serde_json::Value =
1684 serde_json::from_str(&json_content).expect("Failed to deserialize JSON");
1685
1686 vec![
1688 json_value
1690 .get("data")
1691 .unwrap()
1692 .as_str()
1693 .expect("Data is not a valid string")
1694 .to_string(),
1695 ]
1696 };
1697
1698 let sidecar = BlobTransactionSidecarVariant::Eip4844(
1700 BlobTransactionSidecar::try_from_blobs_hex(blobs).unwrap(),
1701 );
1702
1703 let blob_limit = SubPoolLimit::new(1000, usize::MAX);
1705
1706 let test_pool = &TestPoolBuilder::default()
1708 .with_config(PoolConfig { blob_limit, ..Default::default() })
1709 .pool;
1710
1711 test_pool
1713 .set_block_info(BlockInfo { pending_blob_fee: Some(10_000_000), ..Default::default() });
1714
1715 let blob_store = InMemoryBlobStore::default();
1717
1718 for n in 0..blob_limit.max_txs + 10 {
1720 let mut tx = MockTransaction::eip4844_with_sidecar(sidecar.clone());
1722
1723 tx.set_size(1844674407370951);
1725
1726 if n < blob_limit.max_txs {
1728 blob_store.insert(*tx.get_hash(), sidecar.clone()).unwrap();
1729 }
1730
1731 test_pool.add_transactions(
1733 TransactionOrigin::External,
1734 [TransactionValidationOutcome::Valid {
1735 balance: U256::from(1_000),
1736 state_nonce: 0,
1737 bytecode_hash: None,
1738 transaction: ValidTransaction::ValidWithSidecar {
1739 transaction: tx,
1740 sidecar: sidecar.clone(),
1741 },
1742 propagate: true,
1743 authorities: None,
1744 }],
1745 );
1746 }
1747
1748 assert_eq!(test_pool.size().blob, blob_limit.max_txs);
1750
1751 assert_eq!(test_pool.size().blob_size, 1844674407370951000);
1753
1754 assert_eq!(*test_pool.blob_store(), blob_store);
1756 }
1757
1758 #[test]
1759 fn test_auths_stored_in_identifiers() {
1760 let test_pool = &TestPoolBuilder::default().with_config(Default::default()).pool;
1762
1763 let auth = Address::new([1; 20]);
1764 let tx = MockTransaction::eip7702();
1765
1766 test_pool.add_transactions(
1767 TransactionOrigin::Local,
1768 [TransactionValidationOutcome::Valid {
1769 balance: U256::from(1_000),
1770 state_nonce: 0,
1771 bytecode_hash: None,
1772 transaction: ValidTransaction::Valid(tx),
1773 propagate: true,
1774 authorities: Some(vec![auth]),
1775 }],
1776 );
1777
1778 let identifiers = test_pool.identifiers.read();
1779 assert_eq!(identifiers.sender_id(&auth), Some(SenderId::from(1)));
1780 }
1781
1782 #[test]
1783 fn sender_queries_do_not_allocate_ids_for_unknown_addresses() {
1784 let test_pool = &TestPoolBuilder::default().with_config(Default::default()).pool;
1785 let sender = Address::new([9; 20]);
1786
1787 assert_eq!(test_pool.sender_id(&sender), None);
1788 assert!(test_pool.get_transactions_by_sender(sender).is_empty());
1789 assert!(test_pool.get_pending_transaction_by_sender_and_nonce(sender, 0).is_none());
1790 assert!(test_pool.get_queued_transactions_by_sender(sender).is_empty());
1791 assert!(test_pool.get_pending_transactions_by_sender(sender).is_empty());
1792 assert!(test_pool.get_highest_transaction_by_sender(sender).is_none());
1793 assert!(test_pool.get_highest_consecutive_transaction_by_sender(sender, 0).is_none());
1794 assert!(test_pool.remove_transactions_by_sender(sender).is_empty());
1795 assert_eq!(test_pool.sender_id(&sender), None);
1796 }
1797}