1use crate::{
69 blobstore::BlobStore,
70 error::{PoolError, PoolErrorKind, PoolResult},
71 identifier::{SenderId, SenderIdentifiers, TransactionId},
72 metrics::BlobStoreMetrics,
73 pool::{
74 listener::{
75 BlobTransactionSidecarListener, PendingTransactionHashListener, PoolEventBroadcast,
76 TransactionListener,
77 },
78 state::SubPool,
79 txpool::{SenderInfo, TxPool},
80 update::UpdateOutcome,
81 },
82 traits::{
83 AllPoolTransactions, BestTransactionsAttributes, BlockInfo, GetPooledTransactionLimit,
84 NewBlobSidecar, PoolSize, PoolTransaction, PropagatedTransactions, TransactionOrigin,
85 },
86 validate::{TransactionValidationOutcome, ValidPoolTransaction, ValidTransaction},
87 CanonicalStateUpdate, EthPoolTransaction, PoolConfig, TransactionOrdering,
88 TransactionValidator,
89};
90
91use alloy_primitives::{
92 map::{AddressSet, HashSet},
93 Address, TxHash, B256,
94};
95use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
96use reth_eth_wire_types::HandleMempoolData;
97use reth_execution_types::ChangedAccount;
98
99use alloy_eips::{eip7594::BlobTransactionSidecarVariant, Typed2718};
100use reth_primitives_traits::Recovered;
101use rustc_hash::FxHashMap;
102use std::{
103 fmt,
104 sync::{
105 atomic::{AtomicBool, Ordering},
106 Arc,
107 },
108 time::Instant,
109};
110use tokio::sync::mpsc;
111use tracing::{debug, trace, warn};
112mod events;
113pub use best::{BestTransactionFilter, BestTransactionsWithPrioritizedSenders};
114pub use blob::{blob_tx_priority, fee_delta, BlobOrd, BlobTransactions};
115pub use events::{FullTransactionEvent, NewTransactionEvent, TransactionEvent};
116pub use listener::{AllTransactionsEvents, TransactionEvents, TransactionListenerKind};
117pub use parked::{BasefeeOrd, ParkedOrd, ParkedPool, QueuedOrd};
118pub use pending::PendingPool;
119
120mod best;
121pub use best::BestTransactions;
122
123mod blob;
124pub mod listener;
125mod parked;
126pub mod pending;
127pub mod size;
128pub(crate) mod state;
129pub mod txpool;
130mod update;
131
132pub const PENDING_TX_LISTENER_BUFFER_SIZE: usize = 2048;
134pub const NEW_TX_LISTENER_BUFFER_SIZE: usize = 1024;
136
137const BLOB_SIDECAR_LISTENER_BUFFER_SIZE: usize = 512;
138
139pub struct PoolInner<V, T, S>
141where
142 T: TransactionOrdering,
143{
144 identifiers: RwLock<SenderIdentifiers>,
146 validator: V,
148 blob_store: S,
150 pool: RwLock<TxPool<T>>,
152 config: PoolConfig,
154 event_listener: RwLock<PoolEventBroadcast<T::Transaction>>,
156 has_event_listeners: AtomicBool,
158 pending_transaction_listener: RwLock<Vec<PendingTransactionHashListener>>,
160 transaction_listener: RwLock<Vec<TransactionListener<T::Transaction>>>,
162 blob_transaction_sidecar_listener: Mutex<Vec<BlobTransactionSidecarListener>>,
164 blob_store_metrics: BlobStoreMetrics,
166}
167
168impl<V, T, S> PoolInner<V, T, S>
171where
172 V: TransactionValidator,
173 T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
174 S: BlobStore,
175{
176 pub fn new(validator: V, ordering: T, blob_store: S, config: PoolConfig) -> Self {
178 Self {
179 identifiers: Default::default(),
180 validator,
181 event_listener: Default::default(),
182 has_event_listeners: AtomicBool::new(false),
183 pool: RwLock::new(TxPool::new(ordering, config.clone())),
184 pending_transaction_listener: Default::default(),
185 transaction_listener: Default::default(),
186 blob_transaction_sidecar_listener: Default::default(),
187 config,
188 blob_store,
189 blob_store_metrics: Default::default(),
190 }
191 }
192
193 pub const fn blob_store(&self) -> &S {
195 &self.blob_store
196 }
197
198 pub fn size(&self) -> PoolSize {
200 self.get_pool_data().size()
201 }
202
203 pub fn block_info(&self) -> BlockInfo {
205 self.get_pool_data().block_info()
206 }
207 pub fn set_block_info(&self, info: BlockInfo) {
212 let outcome = self.pool.write().set_block_info(info);
213
214 self.notify_on_transaction_updates(outcome.promoted, outcome.discarded);
216 }
217
218 pub fn get_sender_id(&self, addr: Address) -> SenderId {
220 self.identifiers.write().sender_id_or_create(addr)
221 }
222
223 pub fn get_sender_ids(&self, addrs: impl IntoIterator<Item = Address>) -> Vec<SenderId> {
225 self.identifiers.write().sender_ids_or_create(addrs)
226 }
227
228 pub fn unique_senders(&self) -> AddressSet {
230 self.get_pool_data().unique_senders()
231 }
232
233 fn changed_senders(
236 &self,
237 accs: impl Iterator<Item = ChangedAccount>,
238 ) -> FxHashMap<SenderId, SenderInfo> {
239 let identifiers = self.identifiers.read();
240 accs.into_iter()
241 .filter_map(|acc| {
242 let ChangedAccount { address, nonce, balance } = acc;
243 let sender_id = identifiers.sender_id(&address)?;
244 Some((sender_id, SenderInfo { state_nonce: nonce, balance }))
245 })
246 .collect()
247 }
248
249 pub const fn config(&self) -> &PoolConfig {
251 &self.config
252 }
253
254 pub const fn validator(&self) -> &V {
256 &self.validator
257 }
258
259 pub fn add_pending_listener(&self, kind: TransactionListenerKind) -> mpsc::Receiver<TxHash> {
262 let (sender, rx) = mpsc::channel(self.config.pending_tx_listener_buffer_size);
263 let listener = PendingTransactionHashListener { sender, kind };
264
265 let mut listeners = self.pending_transaction_listener.write();
266 listeners.retain(|l| !l.sender.is_closed());
268 listeners.push(listener);
269
270 rx
271 }
272
273 pub fn add_new_transaction_listener(
275 &self,
276 kind: TransactionListenerKind,
277 ) -> mpsc::Receiver<NewTransactionEvent<T::Transaction>> {
278 let (sender, rx) = mpsc::channel(self.config.new_tx_listener_buffer_size);
279 let listener = TransactionListener { sender, kind };
280
281 let mut listeners = self.transaction_listener.write();
282 listeners.retain(|l| !l.sender.is_closed());
284 listeners.push(listener);
285
286 rx
287 }
288 pub fn add_blob_sidecar_listener(&self) -> mpsc::Receiver<NewBlobSidecar> {
291 let (sender, rx) = mpsc::channel(BLOB_SIDECAR_LISTENER_BUFFER_SIZE);
292 let listener = BlobTransactionSidecarListener { sender };
293 self.blob_transaction_sidecar_listener.lock().push(listener);
294 rx
295 }
296
297 pub fn add_transaction_event_listener(&self, tx_hash: TxHash) -> Option<TransactionEvents> {
300 if !self.get_pool_data().contains(&tx_hash) {
301 return None
302 }
303 let mut listener = self.event_listener.write();
304 let events = listener.subscribe(tx_hash);
305 self.mark_event_listener_installed();
306 Some(events)
307 }
308
309 pub fn add_all_transactions_event_listener(&self) -> AllTransactionsEvents<T::Transaction> {
311 let mut listener = self.event_listener.write();
312 let events = listener.subscribe_all();
313 self.mark_event_listener_installed();
314 events
315 }
316
317 #[inline]
318 fn has_event_listeners(&self) -> bool {
319 self.has_event_listeners.load(Ordering::Relaxed)
320 }
321
322 #[inline]
323 fn mark_event_listener_installed(&self) {
324 self.has_event_listeners.store(true, Ordering::Relaxed);
325 }
326
327 #[inline]
328 fn update_event_listener_state(&self, listener: &PoolEventBroadcast<T::Transaction>) {
329 if listener.is_empty() {
330 self.has_event_listeners.store(false, Ordering::Relaxed);
331 }
332 }
333
334 #[inline]
335 fn with_event_listener<F>(&self, emit: F)
336 where
337 F: FnOnce(&mut PoolEventBroadcast<T::Transaction>),
338 {
339 if !self.has_event_listeners() {
340 return
341 }
342 let mut listener = self.event_listener.write();
343 if !listener.is_empty() {
344 emit(&mut listener);
345 }
346 self.update_event_listener_state(&listener);
347 }
348
349 pub fn get_pool_data(&self) -> RwLockReadGuard<'_, TxPool<T>> {
351 self.pool.read()
352 }
353
354 pub fn pooled_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
356 let mut out = Vec::new();
357 self.append_pooled_transactions(&mut out);
358 out
359 }
360
361 pub fn pooled_transactions_hashes(&self) -> Vec<TxHash> {
363 let mut out = Vec::new();
364 self.append_pooled_transactions_hashes(&mut out);
365 out
366 }
367
368 pub fn pooled_transactions_max(
370 &self,
371 max: usize,
372 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
373 let mut out = Vec::new();
374 self.append_pooled_transactions_max(max, &mut out);
375 out
376 }
377
378 pub fn append_pooled_transactions(
380 &self,
381 out: &mut Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
382 ) {
383 out.extend(
384 self.get_pool_data().all().transactions_iter().filter(|tx| tx.propagate).cloned(),
385 );
386 }
387
388 pub fn append_pooled_transaction_elements(
391 &self,
392 tx_hashes: &[TxHash],
393 limit: GetPooledTransactionLimit,
394 out: &mut Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>,
395 ) where
396 <V as TransactionValidator>::Transaction: EthPoolTransaction,
397 {
398 let transactions = self.get_all_propagatable(tx_hashes);
399 let mut size = 0;
400 for transaction in transactions {
401 let encoded_len = transaction.encoded_length();
402 let Some(pooled) = self.to_pooled_transaction(transaction) else {
403 continue;
404 };
405
406 size += encoded_len;
407 out.push(pooled.into_inner());
408
409 if limit.exceeds(size) {
410 break
411 }
412 }
413 }
414
415 pub fn append_pooled_transactions_hashes(&self, out: &mut Vec<TxHash>) {
418 out.extend(
419 self.get_pool_data()
420 .all()
421 .transactions_iter()
422 .filter(|tx| tx.propagate)
423 .map(|tx| *tx.hash()),
424 );
425 }
426
427 pub fn append_pooled_transactions_max(
430 &self,
431 max: usize,
432 out: &mut Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
433 ) {
434 out.extend(
435 self.get_pool_data()
436 .all()
437 .transactions_iter()
438 .filter(|tx| tx.propagate)
439 .take(max)
440 .cloned(),
441 );
442 }
443
444 pub fn pooled_transactions_hashes_max(&self, max: usize) -> Vec<TxHash> {
446 if max == 0 {
447 return Vec::new();
448 }
449 self.get_pool_data()
450 .all()
451 .transactions_iter()
452 .filter(|tx| tx.propagate)
453 .take(max)
454 .map(|tx| *tx.hash())
455 .collect()
456 }
457
458 fn to_pooled_transaction(
463 &self,
464 transaction: Arc<ValidPoolTransaction<T::Transaction>>,
465 ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
466 where
467 <V as TransactionValidator>::Transaction: EthPoolTransaction,
468 {
469 if transaction.is_eip4844() {
470 let sidecar = self.blob_store.get(*transaction.hash()).ok()??;
471 transaction.transaction.clone().try_into_pooled_eip4844(sidecar)
472 } else {
473 transaction
474 .transaction
475 .clone_into_pooled()
476 .inspect_err(|err| {
477 debug!(
478 target: "txpool", %err,
479 "failed to convert transaction to pooled element; skipping",
480 );
481 })
482 .ok()
483 }
484 }
485
486 pub fn get_pooled_transaction_elements(
489 &self,
490 tx_hashes: Vec<TxHash>,
491 limit: GetPooledTransactionLimit,
492 ) -> Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>
493 where
494 <V as TransactionValidator>::Transaction: EthPoolTransaction,
495 {
496 let mut elements = Vec::new();
497 self.append_pooled_transaction_elements(&tx_hashes, limit, &mut elements);
498 elements.shrink_to_fit();
499 elements
500 }
501
502 pub fn get_pooled_transaction_element(
504 &self,
505 tx_hash: TxHash,
506 ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
507 where
508 <V as TransactionValidator>::Transaction: EthPoolTransaction,
509 {
510 self.get(&tx_hash).and_then(|tx| self.to_pooled_transaction(tx))
511 }
512
513 pub fn on_canonical_state_change(&self, update: CanonicalStateUpdate<'_, V::Block>) {
515 trace!(target: "txpool", ?update, "updating pool on canonical state change");
516
517 let block_info = update.block_info();
518 let CanonicalStateUpdate {
519 new_tip, changed_accounts, mined_transactions, update_kind, ..
520 } = update;
521 self.validator.on_new_head_block(new_tip);
522
523 let changed_senders = self.changed_senders(changed_accounts.into_iter());
524
525 let outcome = self.pool.write().on_canonical_state_change(
527 block_info,
528 mined_transactions,
529 changed_senders,
530 update_kind,
531 );
532
533 self.delete_discarded_blobs(outcome.discarded.iter());
535
536 self.notify_on_new_state(outcome);
538 }
539
540 pub fn update_accounts(&self, accounts: Vec<ChangedAccount>) {
546 let changed_senders = self.changed_senders(accounts.into_iter());
547 let UpdateOutcome { promoted, discarded } =
548 self.pool.write().update_accounts(changed_senders);
549
550 self.notify_on_transaction_updates(promoted, discarded);
551 }
552
553 fn add_transaction(
561 &self,
562 pool: &mut RwLockWriteGuard<'_, TxPool<T>>,
563 origin: TransactionOrigin,
564 tx: TransactionValidationOutcome<T::Transaction>,
565 ) -> (PoolResult<AddedTransactionOutcome>, Option<AddedTransactionMeta<T::Transaction>>) {
566 match tx {
567 TransactionValidationOutcome::Valid {
568 balance,
569 state_nonce,
570 transaction,
571 propagate,
572 bytecode_hash,
573 authorities,
574 } => {
575 let sender_id = self.get_sender_id(transaction.sender());
576 let transaction_id = TransactionId::new(sender_id, transaction.nonce());
577
578 let (transaction, blob_sidecar) = match transaction {
580 ValidTransaction::Valid(tx) => (tx, None),
581 ValidTransaction::ValidWithSidecar { transaction, sidecar } => {
582 debug_assert!(
583 transaction.is_eip4844(),
584 "validator returned sidecar for non EIP-4844 transaction"
585 );
586 (transaction, Some(sidecar))
587 }
588 };
589
590 let tx = ValidPoolTransaction {
591 transaction,
592 transaction_id,
593 propagate,
594 timestamp: Instant::now(),
595 origin,
596 authority_ids: authorities.map(|auths| self.get_sender_ids(auths)),
597 };
598
599 let added = match pool.add_transaction(tx, balance, state_nonce, bytecode_hash) {
600 Ok(added) => added,
601 Err(err) => return (Err(err), None),
602 };
603 let hash = *added.hash();
604 let state = added.transaction_state();
605
606 let meta = AddedTransactionMeta { added, blob_sidecar };
607
608 (Ok(AddedTransactionOutcome { hash, state }), Some(meta))
609 }
610 TransactionValidationOutcome::Invalid(tx, err) => {
611 self.with_event_listener(|listener| listener.invalid(tx.hash()));
612 (Err(PoolError::new(*tx.hash(), err)), None)
613 }
614 TransactionValidationOutcome::Error(tx_hash, err) => {
615 self.with_event_listener(|listener| listener.discarded(&tx_hash));
616 (Err(PoolError::other(tx_hash, err)), None)
617 }
618 }
619 }
620
621 pub fn add_transaction_and_subscribe(
623 &self,
624 origin: TransactionOrigin,
625 tx: TransactionValidationOutcome<T::Transaction>,
626 ) -> PoolResult<TransactionEvents> {
627 let listener = {
628 let mut listener = self.event_listener.write();
629 let events = listener.subscribe(tx.tx_hash());
630 self.mark_event_listener_installed();
631 events
632 };
633 let mut results = self.add_transactions(origin, std::iter::once(tx));
634 results.pop().expect("result length is the same as the input")?;
635 Ok(listener)
636 }
637
638 pub fn add_transactions(
643 &self,
644 origin: TransactionOrigin,
645 transactions: impl IntoIterator<Item = TransactionValidationOutcome<T::Transaction>>,
646 ) -> Vec<PoolResult<AddedTransactionOutcome>> {
647 self.add_transactions_with_origins(transactions.into_iter().map(|tx| (origin, tx)))
648 }
649
650 pub fn add_transactions_with_origins(
653 &self,
654 transactions: impl IntoIterator<
655 Item = (TransactionOrigin, TransactionValidationOutcome<T::Transaction>),
656 >,
657 ) -> Vec<PoolResult<AddedTransactionOutcome>> {
658 let (mut results, added_metas, discarded) = {
660 let mut pool = self.pool.write();
661 let mut added_metas = Vec::new();
662
663 let results = transactions
664 .into_iter()
665 .map(|(origin, tx)| {
666 let (result, meta) = self.add_transaction(&mut pool, origin, tx);
667
668 if result.is_ok() &&
670 let Some(meta) = meta
671 {
672 added_metas.push(meta);
673 }
674
675 result
676 })
677 .collect::<Vec<_>>();
678
679 let discarded = if results.iter().any(Result::is_ok) {
681 pool.discard_worst()
682 } else {
683 Default::default()
684 };
685
686 (results, added_metas, discarded)
687 };
688
689 for meta in added_metas {
690 self.on_added_transaction(meta);
691 }
692
693 if !discarded.is_empty() {
694 self.delete_discarded_blobs(discarded.iter());
696 self.with_event_listener(|listener| listener.discarded_many(&discarded));
697
698 let discarded_hashes =
699 discarded.into_iter().map(|tx| *tx.hash()).collect::<HashSet<_>>();
700
701 for res in &mut results {
704 if let Ok(AddedTransactionOutcome { hash, .. }) = res &&
705 discarded_hashes.contains(hash)
706 {
707 *res = Err(PoolError::new(*hash, PoolErrorKind::DiscardedOnInsert))
708 }
709 }
710 };
711
712 results
713 }
714
715 fn on_added_transaction(&self, meta: AddedTransactionMeta<T::Transaction>) {
720 if let Some(sidecar) = meta.blob_sidecar {
722 let hash = *meta.added.hash();
723 self.on_new_blob_sidecar(&hash, &sidecar);
724 self.insert_blob(hash, sidecar);
725 }
726
727 if let Some(replaced) = meta.added.replaced_blob_transaction() {
729 debug!(target: "txpool", "[{:?}] delete replaced blob sidecar", replaced);
730 self.delete_blob(replaced);
731 }
732
733 if let Some(discarded) = meta.added.discarded_transactions() {
735 self.delete_discarded_blobs(discarded.iter());
736 }
737
738 if let Some(pending) = meta.added.as_pending() {
740 self.on_new_pending_transaction(pending);
741 }
742
743 self.notify_event_listeners(&meta.added);
745
746 self.on_new_transaction(meta.added.into_new_transaction_event());
748 }
749
750 pub fn on_new_pending_transaction(&self, pending: &AddedPendingTransaction<T::Transaction>) {
759 let mut needs_cleanup = false;
760
761 {
762 let listeners = self.pending_transaction_listener.read();
763 for listener in listeners.iter() {
764 if !listener.send_all(pending.pending_transactions(listener.kind)) {
765 needs_cleanup = true;
766 }
767 }
768 }
769
770 if needs_cleanup {
772 self.pending_transaction_listener
773 .write()
774 .retain(|listener| !listener.sender.is_closed());
775 }
776 }
777
778 pub fn on_new_transaction(&self, event: NewTransactionEvent<T::Transaction>) {
787 let mut needs_cleanup = false;
788
789 {
790 let listeners = self.transaction_listener.read();
791 for listener in listeners.iter() {
792 if listener.kind.is_propagate_only() && !event.transaction.propagate {
793 if listener.sender.is_closed() {
794 needs_cleanup = true;
795 }
796 continue
798 }
799
800 if !listener.send(event.clone()) {
801 needs_cleanup = true;
802 }
803 }
804 }
805
806 if needs_cleanup {
808 self.transaction_listener.write().retain(|listener| !listener.sender.is_closed());
809 }
810 }
811
812 fn on_new_blob_sidecar(&self, tx_hash: &TxHash, sidecar: &BlobTransactionSidecarVariant) {
814 let mut sidecar_listeners = self.blob_transaction_sidecar_listener.lock();
815 if sidecar_listeners.is_empty() {
816 return
817 }
818 let sidecar = Arc::new(sidecar.clone());
819 sidecar_listeners.retain_mut(|listener| {
820 let new_blob_event = NewBlobSidecar { tx_hash: *tx_hash, sidecar: sidecar.clone() };
821 match listener.sender.try_send(new_blob_event) {
822 Ok(()) => true,
823 Err(err) => {
824 if matches!(err, mpsc::error::TrySendError::Full(_)) {
825 debug!(
826 target: "txpool",
827 "[{:?}] failed to send blob sidecar; channel full",
828 sidecar,
829 );
830 true
831 } else {
832 false
833 }
834 }
835 }
836 })
837 }
838
839 fn notify_on_new_state(&self, outcome: OnNewCanonicalStateOutcome<T::Transaction>) {
841 trace!(target: "txpool", promoted=outcome.promoted.len(), discarded= outcome.discarded.len() ,"notifying listeners on state change");
842
843 let mut needs_pending_cleanup = false;
845 {
846 let listeners = self.pending_transaction_listener.read();
847 for listener in listeners.iter() {
848 if !listener.send_all(outcome.pending_transactions(listener.kind)) {
849 needs_pending_cleanup = true;
850 }
851 }
852 }
853 if needs_pending_cleanup {
854 self.pending_transaction_listener.write().retain(|l| !l.sender.is_closed());
855 }
856
857 let mut needs_tx_cleanup = false;
859 {
860 let listeners = self.transaction_listener.read();
861 for listener in listeners.iter() {
862 if !listener.send_all(outcome.full_pending_transactions(listener.kind)) {
863 needs_tx_cleanup = true;
864 }
865 }
866 }
867 if needs_tx_cleanup {
868 self.transaction_listener.write().retain(|l| !l.sender.is_closed());
869 }
870
871 let OnNewCanonicalStateOutcome { mined, promoted, discarded, block_hash } = outcome;
872
873 self.with_event_listener(|listener| {
875 for tx in &mined {
876 listener.mined(tx, block_hash);
877 }
878 for tx in &promoted {
879 listener.pending(tx.hash(), None);
880 }
881 for tx in &discarded {
882 listener.discarded(tx.hash());
883 }
884 })
885 }
886
887 #[allow(clippy::type_complexity)]
896 pub fn notify_on_transaction_updates(
897 &self,
898 promoted: Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
899 discarded: Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
900 ) {
901 if !promoted.is_empty() {
903 let mut needs_pending_cleanup = false;
904 {
905 let listeners = self.pending_transaction_listener.read();
906 for listener in listeners.iter() {
907 let promoted_hashes = promoted.iter().filter_map(|tx| {
908 if listener.kind.is_propagate_only() && !tx.propagate {
909 None
910 } else {
911 Some(*tx.hash())
912 }
913 });
914 if !listener.send_all(promoted_hashes) {
915 needs_pending_cleanup = true;
916 }
917 }
918 }
919 if needs_pending_cleanup {
920 self.pending_transaction_listener.write().retain(|l| !l.sender.is_closed());
921 }
922
923 let mut needs_tx_cleanup = false;
925 {
926 let listeners = self.transaction_listener.read();
927 for listener in listeners.iter() {
928 let promoted_txs = promoted.iter().filter_map(|tx| {
929 if listener.kind.is_propagate_only() && !tx.propagate {
930 None
931 } else {
932 Some(NewTransactionEvent::pending(tx.clone()))
933 }
934 });
935 if !listener.send_all(promoted_txs) {
936 needs_tx_cleanup = true;
937 }
938 }
939 }
940 if needs_tx_cleanup {
941 self.transaction_listener.write().retain(|l| !l.sender.is_closed());
942 }
943 }
944
945 self.with_event_listener(|listener| {
946 for tx in &promoted {
947 listener.pending(tx.hash(), None);
948 }
949 for tx in &discarded {
950 listener.discarded(tx.hash());
951 }
952 });
953
954 if !discarded.is_empty() {
955 self.delete_discarded_blobs(discarded.iter());
958 }
959 }
960
961 pub fn notify_event_listeners(&self, tx: &AddedTransaction<T::Transaction>) {
970 self.with_event_listener(|listener| match tx {
971 AddedTransaction::Pending(tx) => {
972 let AddedPendingTransaction { transaction, promoted, discarded, replaced } = tx;
973
974 listener.pending(transaction.hash(), replaced.clone());
975 for tx in promoted {
976 listener.pending(tx.hash(), None);
977 }
978 for tx in discarded {
979 listener.discarded(tx.hash());
980 }
981 }
982 AddedTransaction::Parked { transaction, replaced, queued_reason, .. } => {
983 listener.queued(transaction.hash(), queued_reason.clone());
984 if let Some(replaced) = replaced {
985 listener.replaced(replaced.clone(), *transaction.hash());
986 }
987 }
988 });
989 }
990
991 pub fn best_transactions(&self) -> BestTransactions<T> {
993 self.get_pool_data().best_transactions()
994 }
995
996 pub fn best_transactions_with_attributes(
999 &self,
1000 best_transactions_attributes: BestTransactionsAttributes,
1001 ) -> Box<dyn crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T::Transaction>>>>
1002 {
1003 self.get_pool_data().best_transactions_with_attributes(best_transactions_attributes)
1004 }
1005
1006 pub fn pending_transactions_max(
1008 &self,
1009 max: usize,
1010 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1011 self.get_pool_data().pending_transactions_iter().take(max).collect()
1012 }
1013
1014 pub fn pending_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1016 self.get_pool_data().pending_transactions()
1017 }
1018
1019 pub fn queued_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1021 self.get_pool_data().queued_transactions()
1022 }
1023
1024 pub fn all_transactions(&self) -> AllPoolTransactions<T::Transaction> {
1026 let pool = self.get_pool_data();
1027 AllPoolTransactions {
1028 pending: pool.pending_transactions(),
1029 queued: pool.queued_transactions(),
1030 }
1031 }
1032
1033 pub fn all_transaction_hashes(&self) -> Vec<TxHash> {
1035 self.get_pool_data().all().transactions_iter().map(|tx| *tx.hash()).collect()
1036 }
1037
1038 pub fn remove_transactions(
1043 &self,
1044 hashes: Vec<TxHash>,
1045 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1046 if hashes.is_empty() {
1047 return Vec::new()
1048 }
1049 let removed = self.pool.write().remove_transactions(hashes);
1050
1051 self.with_event_listener(|listener| listener.discarded_many(&removed));
1052
1053 removed
1054 }
1055
1056 pub fn remove_transactions_and_descendants(
1059 &self,
1060 hashes: Vec<TxHash>,
1061 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1062 if hashes.is_empty() {
1063 return Vec::new()
1064 }
1065 let removed = self.pool.write().remove_transactions_and_descendants(hashes);
1066
1067 self.with_event_listener(|listener| {
1068 for tx in &removed {
1069 listener.discarded(tx.hash());
1070 }
1071 });
1072
1073 removed
1074 }
1075
1076 pub fn remove_transactions_by_sender(
1078 &self,
1079 sender: Address,
1080 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1081 let sender_id = self.get_sender_id(sender);
1082 let removed = self.pool.write().remove_transactions_by_sender(sender_id);
1083
1084 self.with_event_listener(|listener| listener.discarded_many(&removed));
1085
1086 removed
1087 }
1088
1089 pub fn prune_transactions(
1094 &self,
1095 hashes: Vec<TxHash>,
1096 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1097 if hashes.is_empty() {
1098 return Vec::new()
1099 }
1100
1101 self.pool.write().prune_transactions(hashes)
1102 }
1103
1104 pub fn retain_unknown<A>(&self, announcement: &mut A)
1106 where
1107 A: HandleMempoolData,
1108 {
1109 if announcement.is_empty() {
1110 return
1111 }
1112 let pool = self.get_pool_data();
1113 announcement.retain_by_hash(|tx| !pool.contains(tx))
1114 }
1115
1116 pub fn get(&self, tx_hash: &TxHash) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1118 self.get_pool_data().get(tx_hash)
1119 }
1120
1121 pub fn get_transactions_by_sender(
1123 &self,
1124 sender: Address,
1125 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1126 let sender_id = self.get_sender_id(sender);
1127 self.get_pool_data().get_transactions_by_sender(sender_id)
1128 }
1129
1130 pub fn get_pending_transaction_by_sender_and_nonce(
1132 &self,
1133 sender: Address,
1134 nonce: u64,
1135 ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1136 let sender_id = self.get_sender_id(sender);
1137 self.get_pool_data().get_pending_transaction_by_sender_and_nonce(sender_id, nonce)
1138 }
1139
1140 pub fn get_queued_transactions_by_sender(
1142 &self,
1143 sender: Address,
1144 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1145 let sender_id = self.get_sender_id(sender);
1146 self.get_pool_data().queued_txs_by_sender(sender_id)
1147 }
1148
1149 pub fn pending_transactions_with_predicate(
1151 &self,
1152 predicate: impl FnMut(&ValidPoolTransaction<T::Transaction>) -> bool,
1153 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1154 self.get_pool_data().pending_transactions_with_predicate(predicate)
1155 }
1156
1157 pub fn get_pending_transactions_by_sender(
1159 &self,
1160 sender: Address,
1161 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1162 let sender_id = self.get_sender_id(sender);
1163 self.get_pool_data().pending_txs_by_sender(sender_id)
1164 }
1165
1166 pub fn get_highest_transaction_by_sender(
1168 &self,
1169 sender: Address,
1170 ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1171 let sender_id = self.get_sender_id(sender);
1172 self.get_pool_data().get_highest_transaction_by_sender(sender_id)
1173 }
1174
1175 pub fn get_highest_consecutive_transaction_by_sender(
1177 &self,
1178 sender: Address,
1179 on_chain_nonce: u64,
1180 ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1181 let sender_id = self.get_sender_id(sender);
1182 self.get_pool_data().get_highest_consecutive_transaction_by_sender(
1183 sender_id.into_transaction_id(on_chain_nonce),
1184 )
1185 }
1186
1187 pub fn get_transaction_by_transaction_id(
1189 &self,
1190 transaction_id: &TransactionId,
1191 ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1192 self.get_pool_data().all().get(transaction_id).map(|tx| tx.transaction.clone())
1193 }
1194
1195 pub fn get_transactions_by_origin(
1197 &self,
1198 origin: TransactionOrigin,
1199 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1200 self.get_pool_data()
1201 .all()
1202 .transactions_iter()
1203 .filter(|tx| tx.origin == origin)
1204 .cloned()
1205 .collect()
1206 }
1207
1208 pub fn get_pending_transactions_by_origin(
1210 &self,
1211 origin: TransactionOrigin,
1212 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1213 self.get_pool_data().pending_transactions_iter().filter(|tx| tx.origin == origin).collect()
1214 }
1215
1216 pub fn get_all(&self, txs: Vec<TxHash>) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1220 if txs.is_empty() {
1221 return Vec::new()
1222 }
1223 self.get_pool_data().get_all(txs).collect()
1224 }
1225
1226 fn get_all_propagatable(
1230 &self,
1231 txs: &[TxHash],
1232 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1233 if txs.is_empty() {
1234 return Vec::new()
1235 }
1236 let pool = self.get_pool_data();
1237 txs.iter().filter_map(|tx| pool.get(tx).filter(|tx| tx.propagate)).collect()
1238 }
1239
1240 pub fn on_propagated(&self, txs: PropagatedTransactions) {
1242 if txs.is_empty() {
1243 return
1244 }
1245 self.with_event_listener(|listener| {
1246 txs.into_iter().for_each(|(hash, peers)| listener.propagated(&hash, peers));
1247 });
1248 }
1249
1250 pub fn len(&self) -> usize {
1252 self.get_pool_data().len()
1253 }
1254
1255 pub fn is_empty(&self) -> bool {
1257 self.get_pool_data().is_empty()
1258 }
1259
1260 pub fn is_exceeded(&self) -> bool {
1262 self.pool.read().is_exceeded()
1263 }
1264
1265 fn insert_blob(&self, hash: TxHash, blob: BlobTransactionSidecarVariant) {
1267 debug!(target: "txpool", "[{:?}] storing blob sidecar", hash);
1268 if let Err(err) = self.blob_store.insert(hash, blob) {
1269 warn!(target: "txpool", %err, "[{:?}] failed to insert blob", hash);
1270 self.blob_store_metrics.blobstore_failed_inserts.increment(1);
1271 }
1272 self.update_blob_store_metrics();
1273 }
1274
1275 pub fn delete_blob(&self, blob: TxHash) {
1277 let _ = self.blob_store.delete(blob);
1278 }
1279
1280 pub fn delete_blobs(&self, txs: Vec<TxHash>) {
1282 let _ = self.blob_store.delete_all(txs);
1283 }
1284
1285 pub fn cleanup_blobs(&self) {
1287 let stat = self.blob_store.cleanup();
1288 self.blob_store_metrics.blobstore_failed_deletes.increment(stat.delete_failed as u64);
1289 self.update_blob_store_metrics();
1290 }
1291
1292 fn update_blob_store_metrics(&self) {
1293 if let Some(data_size) = self.blob_store.data_size_hint() {
1294 self.blob_store_metrics.blobstore_byte_size.set(data_size as f64);
1295 }
1296 self.blob_store_metrics.blobstore_entries.set(self.blob_store.blobs_len() as f64);
1297 }
1298
1299 fn delete_discarded_blobs<'a>(
1301 &'a self,
1302 transactions: impl IntoIterator<Item = &'a Arc<ValidPoolTransaction<T::Transaction>>>,
1303 ) {
1304 let blob_txs = transactions
1305 .into_iter()
1306 .filter(|tx| tx.transaction.is_eip4844())
1307 .map(|tx| *tx.hash())
1308 .collect();
1309 self.delete_blobs(blob_txs);
1310 }
1311}
1312
1313impl<V, T: TransactionOrdering, S> fmt::Debug for PoolInner<V, T, S> {
1314 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1315 f.debug_struct("PoolInner").field("config", &self.config).finish_non_exhaustive()
1316 }
1317}
1318
1319#[derive(Debug)]
1324struct AddedTransactionMeta<T: PoolTransaction> {
1325 added: AddedTransaction<T>,
1327 blob_sidecar: Option<BlobTransactionSidecarVariant>,
1329}
1330
1331#[derive(Debug, Clone)]
1333pub struct AddedPendingTransaction<T: PoolTransaction> {
1334 pub transaction: Arc<ValidPoolTransaction<T>>,
1336 pub replaced: Option<Arc<ValidPoolTransaction<T>>>,
1338 pub promoted: Vec<Arc<ValidPoolTransaction<T>>>,
1340 pub discarded: Vec<Arc<ValidPoolTransaction<T>>>,
1342}
1343
1344impl<T: PoolTransaction> AddedPendingTransaction<T> {
1345 pub(crate) fn pending_transactions(
1351 &self,
1352 kind: TransactionListenerKind,
1353 ) -> impl Iterator<Item = B256> + '_ {
1354 let iter = std::iter::once(&self.transaction).chain(self.promoted.iter());
1355 PendingTransactionIter { kind, iter }
1356 }
1357}
1358
1359pub(crate) struct PendingTransactionIter<Iter> {
1360 kind: TransactionListenerKind,
1361 iter: Iter,
1362}
1363
1364impl<'a, Iter, T> Iterator for PendingTransactionIter<Iter>
1365where
1366 Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1367 T: PoolTransaction + 'a,
1368{
1369 type Item = B256;
1370
1371 fn next(&mut self) -> Option<Self::Item> {
1372 loop {
1373 let next = self.iter.next()?;
1374 if self.kind.is_propagate_only() && !next.propagate {
1375 continue
1376 }
1377 return Some(*next.hash())
1378 }
1379 }
1380}
1381
1382pub(crate) struct FullPendingTransactionIter<Iter> {
1384 kind: TransactionListenerKind,
1385 iter: Iter,
1386}
1387
1388impl<'a, Iter, T> Iterator for FullPendingTransactionIter<Iter>
1389where
1390 Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1391 T: PoolTransaction + 'a,
1392{
1393 type Item = NewTransactionEvent<T>;
1394
1395 fn next(&mut self) -> Option<Self::Item> {
1396 loop {
1397 let next = self.iter.next()?;
1398 if self.kind.is_propagate_only() && !next.propagate {
1399 continue
1400 }
1401 return Some(NewTransactionEvent {
1402 subpool: SubPool::Pending,
1403 transaction: next.clone(),
1404 })
1405 }
1406 }
1407}
1408
1409#[derive(Debug, Clone)]
1411pub enum AddedTransaction<T: PoolTransaction> {
1412 Pending(AddedPendingTransaction<T>),
1414 Parked {
1417 transaction: Arc<ValidPoolTransaction<T>>,
1419 replaced: Option<Arc<ValidPoolTransaction<T>>>,
1421 subpool: SubPool,
1423 queued_reason: Option<QueuedReason>,
1425 },
1426}
1427
1428impl<T: PoolTransaction> AddedTransaction<T> {
1429 pub const fn as_pending(&self) -> Option<&AddedPendingTransaction<T>> {
1431 match self {
1432 Self::Pending(tx) => Some(tx),
1433 _ => None,
1434 }
1435 }
1436
1437 pub const fn replaced(&self) -> Option<&Arc<ValidPoolTransaction<T>>> {
1439 match self {
1440 Self::Pending(tx) => tx.replaced.as_ref(),
1441 Self::Parked { replaced, .. } => replaced.as_ref(),
1442 }
1443 }
1444
1445 pub(crate) fn discarded_transactions(&self) -> Option<&[Arc<ValidPoolTransaction<T>>]> {
1447 match self {
1448 Self::Pending(tx) => Some(&tx.discarded),
1449 Self::Parked { .. } => None,
1450 }
1451 }
1452
1453 pub(crate) fn replaced_blob_transaction(&self) -> Option<B256> {
1455 self.replaced().filter(|tx| tx.transaction.is_eip4844()).map(|tx| *tx.transaction.hash())
1456 }
1457
1458 pub fn hash(&self) -> &TxHash {
1460 match self {
1461 Self::Pending(tx) => tx.transaction.hash(),
1462 Self::Parked { transaction, .. } => transaction.hash(),
1463 }
1464 }
1465
1466 pub fn into_new_transaction_event(self) -> NewTransactionEvent<T> {
1468 match self {
1469 Self::Pending(tx) => {
1470 NewTransactionEvent { subpool: SubPool::Pending, transaction: tx.transaction }
1471 }
1472 Self::Parked { transaction, subpool, .. } => {
1473 NewTransactionEvent { transaction, subpool }
1474 }
1475 }
1476 }
1477
1478 pub(crate) const fn subpool(&self) -> SubPool {
1480 match self {
1481 Self::Pending(_) => SubPool::Pending,
1482 Self::Parked { subpool, .. } => *subpool,
1483 }
1484 }
1485
1486 #[cfg(test)]
1488 pub(crate) fn id(&self) -> &TransactionId {
1489 match self {
1490 Self::Pending(added) => added.transaction.id(),
1491 Self::Parked { transaction, .. } => transaction.id(),
1492 }
1493 }
1494
1495 pub const fn queued_reason(&self) -> Option<&QueuedReason> {
1497 match self {
1498 Self::Pending(_) => None,
1499 Self::Parked { queued_reason, .. } => queued_reason.as_ref(),
1500 }
1501 }
1502
1503 pub fn transaction_state(&self) -> AddedTransactionState {
1505 match self.subpool() {
1506 SubPool::Pending => AddedTransactionState::Pending,
1507 _ => {
1508 if let Some(reason) = self.queued_reason() {
1511 AddedTransactionState::Queued(reason.clone())
1512 } else {
1513 AddedTransactionState::Queued(QueuedReason::NonceGap)
1515 }
1516 }
1517 }
1518 }
1519}
1520
1521#[derive(Debug, Clone, PartialEq, Eq)]
1523pub enum QueuedReason {
1524 NonceGap,
1526 ParkedAncestors,
1528 InsufficientBalance,
1530 TooMuchGas,
1532 InsufficientBaseFee,
1534 InsufficientBlobFee,
1536}
1537
1538#[derive(Debug, Clone, PartialEq, Eq)]
1540pub enum AddedTransactionState {
1541 Pending,
1543 Queued(QueuedReason),
1545}
1546
1547impl AddedTransactionState {
1548 pub const fn is_queued(&self) -> bool {
1550 matches!(self, Self::Queued(_))
1551 }
1552
1553 pub const fn is_pending(&self) -> bool {
1555 matches!(self, Self::Pending)
1556 }
1557
1558 pub const fn queued_reason(&self) -> Option<&QueuedReason> {
1560 match self {
1561 Self::Queued(reason) => Some(reason),
1562 Self::Pending => None,
1563 }
1564 }
1565}
1566
1567#[derive(Debug, Clone, PartialEq, Eq)]
1569pub struct AddedTransactionOutcome {
1570 pub hash: TxHash,
1572 pub state: AddedTransactionState,
1574}
1575
1576impl AddedTransactionOutcome {
1577 pub const fn is_queued(&self) -> bool {
1579 self.state.is_queued()
1580 }
1581
1582 pub const fn is_pending(&self) -> bool {
1584 self.state.is_pending()
1585 }
1586}
1587
1588#[derive(Debug)]
1590pub(crate) struct OnNewCanonicalStateOutcome<T: PoolTransaction> {
1591 pub(crate) block_hash: B256,
1593 pub(crate) mined: Vec<TxHash>,
1595 pub(crate) promoted: Vec<Arc<ValidPoolTransaction<T>>>,
1597 pub(crate) discarded: Vec<Arc<ValidPoolTransaction<T>>>,
1599}
1600
1601impl<T: PoolTransaction> OnNewCanonicalStateOutcome<T> {
1602 pub(crate) fn pending_transactions(
1608 &self,
1609 kind: TransactionListenerKind,
1610 ) -> impl Iterator<Item = B256> + '_ {
1611 let iter = self.promoted.iter();
1612 PendingTransactionIter { kind, iter }
1613 }
1614
1615 pub(crate) fn full_pending_transactions(
1621 &self,
1622 kind: TransactionListenerKind,
1623 ) -> impl Iterator<Item = NewTransactionEvent<T>> + '_ {
1624 let iter = self.promoted.iter();
1625 FullPendingTransactionIter { kind, iter }
1626 }
1627}
1628
1629#[cfg(test)]
1630mod tests {
1631 use crate::{
1632 blobstore::{BlobStore, InMemoryBlobStore},
1633 identifier::SenderId,
1634 test_utils::{MockTransaction, TestPoolBuilder},
1635 validate::ValidTransaction,
1636 BlockInfo, PoolConfig, SubPoolLimit, TransactionOrigin, TransactionValidationOutcome, U256,
1637 };
1638 use alloy_eips::{eip4844::BlobTransactionSidecar, eip7594::BlobTransactionSidecarVariant};
1639 use alloy_primitives::Address;
1640 use std::{fs, path::PathBuf};
1641
1642 #[test]
1643 fn test_discard_blobs_on_blob_tx_eviction() {
1644 let blobs = {
1645 let json_content = fs::read_to_string(
1647 PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test_data/blob1.json"),
1648 )
1649 .expect("Failed to read the blob data file");
1650
1651 let json_value: serde_json::Value =
1653 serde_json::from_str(&json_content).expect("Failed to deserialize JSON");
1654
1655 vec![
1657 json_value
1659 .get("data")
1660 .unwrap()
1661 .as_str()
1662 .expect("Data is not a valid string")
1663 .to_string(),
1664 ]
1665 };
1666
1667 let sidecar = BlobTransactionSidecarVariant::Eip4844(
1669 BlobTransactionSidecar::try_from_blobs_hex(blobs).unwrap(),
1670 );
1671
1672 let blob_limit = SubPoolLimit::new(1000, usize::MAX);
1674
1675 let test_pool = &TestPoolBuilder::default()
1677 .with_config(PoolConfig { blob_limit, ..Default::default() })
1678 .pool;
1679
1680 test_pool
1682 .set_block_info(BlockInfo { pending_blob_fee: Some(10_000_000), ..Default::default() });
1683
1684 let blob_store = InMemoryBlobStore::default();
1686
1687 for n in 0..blob_limit.max_txs + 10 {
1689 let mut tx = MockTransaction::eip4844_with_sidecar(sidecar.clone());
1691
1692 tx.set_size(1844674407370951);
1694
1695 if n < blob_limit.max_txs {
1697 blob_store.insert(*tx.get_hash(), sidecar.clone()).unwrap();
1698 }
1699
1700 test_pool.add_transactions(
1702 TransactionOrigin::External,
1703 [TransactionValidationOutcome::Valid {
1704 balance: U256::from(1_000),
1705 state_nonce: 0,
1706 bytecode_hash: None,
1707 transaction: ValidTransaction::ValidWithSidecar {
1708 transaction: tx,
1709 sidecar: sidecar.clone(),
1710 },
1711 propagate: true,
1712 authorities: None,
1713 }],
1714 );
1715 }
1716
1717 assert_eq!(test_pool.size().blob, blob_limit.max_txs);
1719
1720 assert_eq!(test_pool.size().blob_size, 1844674407370951000);
1722
1723 assert_eq!(*test_pool.blob_store(), blob_store);
1725 }
1726
1727 #[test]
1728 fn test_auths_stored_in_identifiers() {
1729 let test_pool = &TestPoolBuilder::default().with_config(Default::default()).pool;
1731
1732 let auth = Address::new([1; 20]);
1733 let tx = MockTransaction::eip7702();
1734
1735 test_pool.add_transactions(
1736 TransactionOrigin::Local,
1737 [TransactionValidationOutcome::Valid {
1738 balance: U256::from(1_000),
1739 state_nonce: 0,
1740 bytecode_hash: None,
1741 transaction: ValidTransaction::Valid(tx),
1742 propagate: true,
1743 authorities: Some(vec![auth]),
1744 }],
1745 );
1746
1747 let identifiers = test_pool.identifiers.read();
1748 assert_eq!(identifiers.sender_id(&auth), Some(SenderId::from(1)));
1749 }
1750}