1#![doc(
148 html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
149 html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
150 issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
151)]
152#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
153#![cfg_attr(not(test), warn(unused_crate_dependencies))]
154
155pub use crate::{
156 blobstore::{BlobStore, BlobStoreError},
157 config::{
158 LocalTransactionConfig, PoolConfig, PriceBumpConfig, SubPoolLimit, DEFAULT_PRICE_BUMP,
159 DEFAULT_TXPOOL_ADDITIONAL_VALIDATION_TASKS, MAX_NEW_PENDING_TXS_NOTIFICATIONS,
160 REPLACE_BLOB_PRICE_BUMP, TXPOOL_MAX_ACCOUNT_SLOTS_PER_SENDER,
161 TXPOOL_SUBPOOL_MAX_SIZE_MB_DEFAULT, TXPOOL_SUBPOOL_MAX_TXS_DEFAULT,
162 },
163 error::PoolResult,
164 ordering::{CoinbaseTipOrdering, Priority, TransactionOrdering},
165 pool::{
166 blob_tx_priority, fee_delta, state::SubPool, AllTransactionsEvents, FullTransactionEvent,
167 NewTransactionEvent, TransactionEvent, TransactionEvents, TransactionListenerKind,
168 },
169 traits::*,
170 validate::{
171 EthTransactionValidator, TransactionValidationOutcome, TransactionValidationTaskExecutor,
172 TransactionValidator, ValidPoolTransaction,
173 },
174};
175use crate::{identifier::TransactionId, pool::PoolInner};
176use alloy_eips::eip4844::{BlobAndProofV1, BlobTransactionSidecar};
177use alloy_primitives::{Address, TxHash, B256, U256};
178use aquamarine as _;
179use reth_chainspec::{ChainSpecProvider, EthereumHardforks};
180use reth_eth_wire_types::HandleMempoolData;
181use reth_execution_types::ChangedAccount;
182use reth_primitives_traits::{Block, Recovered};
183use reth_storage_api::StateProviderFactory;
184use std::{collections::HashSet, sync::Arc};
185use tokio::sync::mpsc::Receiver;
186use tracing::{instrument, trace};
187
188pub mod error;
189pub mod maintain;
190pub mod metrics;
191pub mod noop;
192pub mod pool;
193pub mod validate;
194
195pub mod blobstore;
196mod config;
197pub mod identifier;
198mod ordering;
199mod traits;
200
201#[cfg(any(test, feature = "test-utils"))]
202pub mod test_utils;
204
205pub type EthTransactionPool<Client, S> = Pool<
207 TransactionValidationTaskExecutor<EthTransactionValidator<Client, EthPooledTransaction>>,
208 CoinbaseTipOrdering<EthPooledTransaction>,
209 S,
210>;
211
212#[derive(Debug)]
214pub struct Pool<V, T: TransactionOrdering, S> {
215 pool: Arc<PoolInner<V, T, S>>,
217}
218
219impl<V, T, S> Pool<V, T, S>
222where
223 V: TransactionValidator,
224 T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
225 S: BlobStore,
226{
227 pub fn new(validator: V, ordering: T, blob_store: S, config: PoolConfig) -> Self {
229 Self { pool: Arc::new(PoolInner::new(validator, ordering, blob_store, config)) }
230 }
231
232 pub(crate) fn inner(&self) -> &PoolInner<V, T, S> {
234 &self.pool
235 }
236
237 pub fn config(&self) -> &PoolConfig {
239 self.inner().config()
240 }
241
242 async fn validate_all(
246 &self,
247 origin: TransactionOrigin,
248 transactions: impl IntoIterator<Item = V::Transaction>,
249 ) -> Vec<(TxHash, TransactionValidationOutcome<V::Transaction>)> {
250 futures_util::future::join_all(transactions.into_iter().map(|tx| self.validate(origin, tx)))
251 .await
252 }
253
254 async fn validate(
256 &self,
257 origin: TransactionOrigin,
258 transaction: V::Transaction,
259 ) -> (TxHash, TransactionValidationOutcome<V::Transaction>) {
260 let hash = *transaction.hash();
261
262 let outcome = self.pool.validator().validate_transaction(origin, transaction).await;
263
264 (hash, outcome)
265 }
266
267 pub fn len(&self) -> usize {
269 self.pool.len()
270 }
271
272 pub fn is_empty(&self) -> bool {
274 self.pool.is_empty()
275 }
276
277 pub fn is_exceeded(&self) -> bool {
279 self.pool.is_exceeded()
280 }
281
282 pub fn blob_store(&self) -> &S {
284 self.pool.blob_store()
285 }
286}
287
288impl<Client, S> EthTransactionPool<Client, S>
289where
290 Client:
291 ChainSpecProvider<ChainSpec: EthereumHardforks> + StateProviderFactory + Clone + 'static,
292 S: BlobStore,
293{
294 pub fn eth_pool(
322 validator: TransactionValidationTaskExecutor<
323 EthTransactionValidator<Client, EthPooledTransaction>,
324 >,
325 blob_store: S,
326 config: PoolConfig,
327 ) -> Self {
328 Self::new(validator, CoinbaseTipOrdering::default(), blob_store, config)
329 }
330}
331
332impl<V, T, S> TransactionPool for Pool<V, T, S>
334where
335 V: TransactionValidator,
336 <V as TransactionValidator>::Transaction: EthPoolTransaction,
337 T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
338 S: BlobStore,
339{
340 type Transaction = T::Transaction;
341
342 fn pool_size(&self) -> PoolSize {
343 self.pool.size()
344 }
345
346 fn block_info(&self) -> BlockInfo {
347 self.pool.block_info()
348 }
349
350 async fn add_transaction_and_subscribe(
351 &self,
352 origin: TransactionOrigin,
353 transaction: Self::Transaction,
354 ) -> PoolResult<TransactionEvents> {
355 let (_, tx) = self.validate(origin, transaction).await;
356 self.pool.add_transaction_and_subscribe(origin, tx)
357 }
358
359 async fn add_transaction(
360 &self,
361 origin: TransactionOrigin,
362 transaction: Self::Transaction,
363 ) -> PoolResult<TxHash> {
364 let (_, tx) = self.validate(origin, transaction).await;
365 let mut results = self.pool.add_transactions(origin, std::iter::once(tx));
366 results.pop().expect("result length is the same as the input")
367 }
368
369 async fn add_transactions(
370 &self,
371 origin: TransactionOrigin,
372 transactions: Vec<Self::Transaction>,
373 ) -> Vec<PoolResult<TxHash>> {
374 if transactions.is_empty() {
375 return Vec::new()
376 }
377 let validated = self.validate_all(origin, transactions).await;
378
379 self.pool.add_transactions(origin, validated.into_iter().map(|(_, tx)| tx))
380 }
381
382 fn transaction_event_listener(&self, tx_hash: TxHash) -> Option<TransactionEvents> {
383 self.pool.add_transaction_event_listener(tx_hash)
384 }
385
386 fn all_transactions_event_listener(&self) -> AllTransactionsEvents<Self::Transaction> {
387 self.pool.add_all_transactions_event_listener()
388 }
389
390 fn pending_transactions_listener_for(&self, kind: TransactionListenerKind) -> Receiver<TxHash> {
391 self.pool.add_pending_listener(kind)
392 }
393
394 fn blob_transaction_sidecars_listener(&self) -> Receiver<NewBlobSidecar> {
395 self.pool.add_blob_sidecar_listener()
396 }
397
398 fn new_transactions_listener_for(
399 &self,
400 kind: TransactionListenerKind,
401 ) -> Receiver<NewTransactionEvent<Self::Transaction>> {
402 self.pool.add_new_transaction_listener(kind)
403 }
404
405 fn pooled_transaction_hashes(&self) -> Vec<TxHash> {
406 self.pool.pooled_transactions_hashes()
407 }
408
409 fn pooled_transaction_hashes_max(&self, max: usize) -> Vec<TxHash> {
410 self.pooled_transaction_hashes().into_iter().take(max).collect()
411 }
412
413 fn pooled_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
414 self.pool.pooled_transactions()
415 }
416
417 fn pooled_transactions_max(
418 &self,
419 max: usize,
420 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
421 self.pool.pooled_transactions_max(max)
422 }
423
424 fn get_pooled_transaction_elements(
425 &self,
426 tx_hashes: Vec<TxHash>,
427 limit: GetPooledTransactionLimit,
428 ) -> Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled> {
429 self.pool.get_pooled_transaction_elements(tx_hashes, limit)
430 }
431
432 fn get_pooled_transaction_element(
433 &self,
434 tx_hash: TxHash,
435 ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
436 {
437 self.pool.get_pooled_transaction_element(tx_hash)
438 }
439
440 fn best_transactions(
441 &self,
442 ) -> Box<dyn BestTransactions<Item = Arc<ValidPoolTransaction<Self::Transaction>>>> {
443 Box::new(self.pool.best_transactions())
444 }
445
446 fn best_transactions_with_attributes(
447 &self,
448 best_transactions_attributes: BestTransactionsAttributes,
449 ) -> Box<dyn BestTransactions<Item = Arc<ValidPoolTransaction<Self::Transaction>>>> {
450 self.pool.best_transactions_with_attributes(best_transactions_attributes)
451 }
452
453 fn pending_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
454 self.pool.pending_transactions()
455 }
456
457 fn pending_transactions_max(
458 &self,
459 max: usize,
460 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
461 self.pool.pending_transactions_max(max)
462 }
463
464 fn queued_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
465 self.pool.queued_transactions()
466 }
467
468 fn all_transactions(&self) -> AllPoolTransactions<Self::Transaction> {
469 self.pool.all_transactions()
470 }
471
472 fn remove_transactions(
473 &self,
474 hashes: Vec<TxHash>,
475 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
476 self.pool.remove_transactions(hashes)
477 }
478
479 fn remove_transactions_and_descendants(
480 &self,
481 hashes: Vec<TxHash>,
482 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
483 self.pool.remove_transactions_and_descendants(hashes)
484 }
485
486 fn remove_transactions_by_sender(
487 &self,
488 sender: Address,
489 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
490 self.pool.remove_transactions_by_sender(sender)
491 }
492
493 fn retain_unknown<A>(&self, announcement: &mut A)
494 where
495 A: HandleMempoolData,
496 {
497 self.pool.retain_unknown(announcement)
498 }
499
500 fn get(&self, tx_hash: &TxHash) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>> {
501 self.inner().get(tx_hash)
502 }
503
504 fn get_all(&self, txs: Vec<TxHash>) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
505 self.inner().get_all(txs)
506 }
507
508 fn on_propagated(&self, txs: PropagatedTransactions) {
509 self.inner().on_propagated(txs)
510 }
511
512 fn get_transactions_by_sender(
513 &self,
514 sender: Address,
515 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
516 self.pool.get_transactions_by_sender(sender)
517 }
518
519 fn get_pending_transactions_with_predicate(
520 &self,
521 predicate: impl FnMut(&ValidPoolTransaction<Self::Transaction>) -> bool,
522 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
523 self.pool.pending_transactions_with_predicate(predicate)
524 }
525
526 fn get_pending_transactions_by_sender(
527 &self,
528 sender: Address,
529 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
530 self.pool.get_pending_transactions_by_sender(sender)
531 }
532
533 fn get_queued_transactions_by_sender(
534 &self,
535 sender: Address,
536 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
537 self.pool.get_queued_transactions_by_sender(sender)
538 }
539
540 fn get_highest_transaction_by_sender(
541 &self,
542 sender: Address,
543 ) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>> {
544 self.pool.get_highest_transaction_by_sender(sender)
545 }
546
547 fn get_highest_consecutive_transaction_by_sender(
548 &self,
549 sender: Address,
550 on_chain_nonce: u64,
551 ) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>> {
552 self.pool.get_highest_consecutive_transaction_by_sender(sender, on_chain_nonce)
553 }
554
555 fn get_transaction_by_sender_and_nonce(
556 &self,
557 sender: Address,
558 nonce: u64,
559 ) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>> {
560 let transaction_id = TransactionId::new(self.pool.get_sender_id(sender), nonce);
561
562 self.inner().get_pool_data().all().get(&transaction_id).map(|tx| tx.transaction.clone())
563 }
564
565 fn get_transactions_by_origin(
566 &self,
567 origin: TransactionOrigin,
568 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
569 self.pool.get_transactions_by_origin(origin)
570 }
571
572 fn get_pending_transactions_by_origin(
574 &self,
575 origin: TransactionOrigin,
576 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
577 self.pool.get_pending_transactions_by_origin(origin)
578 }
579
580 fn unique_senders(&self) -> HashSet<Address> {
581 self.pool.unique_senders()
582 }
583
584 fn get_blob(
585 &self,
586 tx_hash: TxHash,
587 ) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError> {
588 self.pool.blob_store().get(tx_hash)
589 }
590
591 fn get_all_blobs(
592 &self,
593 tx_hashes: Vec<TxHash>,
594 ) -> Result<Vec<(TxHash, Arc<BlobTransactionSidecar>)>, BlobStoreError> {
595 self.pool.blob_store().get_all(tx_hashes)
596 }
597
598 fn get_all_blobs_exact(
599 &self,
600 tx_hashes: Vec<TxHash>,
601 ) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError> {
602 self.pool.blob_store().get_exact(tx_hashes)
603 }
604
605 fn get_blobs_for_versioned_hashes(
606 &self,
607 versioned_hashes: &[B256],
608 ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
609 self.pool.blob_store().get_by_versioned_hashes(versioned_hashes)
610 }
611}
612
613impl<V, T, S> TransactionPoolExt for Pool<V, T, S>
614where
615 V: TransactionValidator,
616 <V as TransactionValidator>::Transaction: EthPoolTransaction,
617 T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
618 S: BlobStore,
619{
620 #[instrument(skip(self), target = "txpool")]
621 fn set_block_info(&self, info: BlockInfo) {
622 trace!(target: "txpool", "updating pool block info");
623 self.pool.set_block_info(info)
624 }
625
626 fn on_canonical_state_change<B>(&self, update: CanonicalStateUpdate<'_, B>)
627 where
628 B: Block,
629 {
630 self.pool.on_canonical_state_change(update);
631 }
632
633 fn update_accounts(&self, accounts: Vec<ChangedAccount>) {
634 self.pool.update_accounts(accounts);
635 }
636
637 fn delete_blob(&self, tx: TxHash) {
638 self.pool.delete_blob(tx)
639 }
640
641 fn delete_blobs(&self, txs: Vec<TxHash>) {
642 self.pool.delete_blobs(txs)
643 }
644
645 fn cleanup_blobs(&self) {
646 self.pool.cleanup_blobs()
647 }
648}
649
650impl<V, T: TransactionOrdering, S> Clone for Pool<V, T, S> {
651 fn clone(&self) -> Self {
652 Self { pool: Arc::clone(&self.pool) }
653 }
654}