reth_transaction_pool/traits.rs
1use crate::{
2 blobstore::BlobStoreError,
3 error::{InvalidPoolTransactionError, PoolResult},
4 pool::{
5 state::SubPool, BestTransactionFilter, NewTransactionEvent, TransactionEvents,
6 TransactionListenerKind,
7 },
8 validate::ValidPoolTransaction,
9 AllTransactionsEvents,
10};
11use alloy_consensus::{
12 error::ValueError, transaction::PooledTransaction, BlockHeader, Signed, Typed2718,
13};
14use alloy_eips::{
15 eip2718::Encodable2718,
16 eip2930::AccessList,
17 eip4844::{
18 env_settings::KzgSettings, BlobAndProofV1, BlobTransactionSidecar,
19 BlobTransactionValidationError,
20 },
21 eip7702::SignedAuthorization,
22};
23use alloy_primitives::{Address, Bytes, TxHash, TxKind, B256, U256};
24use futures_util::{ready, Stream};
25use reth_eth_wire_types::HandleMempoolData;
26use reth_ethereum_primitives::TransactionSigned;
27use reth_execution_types::ChangedAccount;
28use reth_primitives_traits::{Block, InMemorySize, Recovered, SealedBlock, SignedTransaction};
29#[cfg(feature = "serde")]
30use serde::{Deserialize, Serialize};
31use std::{
32 collections::{HashMap, HashSet},
33 fmt,
34 fmt::Debug,
35 future::Future,
36 pin::Pin,
37 sync::Arc,
38 task::{Context, Poll},
39};
40use tokio::sync::mpsc::Receiver;
41
42/// The `PeerId` type.
43pub type PeerId = alloy_primitives::B512;
44
45/// Helper type alias to access [`PoolTransaction`] for a given [`TransactionPool`].
46pub type PoolTx<P> = <P as TransactionPool>::Transaction;
47/// Helper type alias to access [`PoolTransaction::Consensus`] for a given [`TransactionPool`].
48pub type PoolConsensusTx<P> = <<P as TransactionPool>::Transaction as PoolTransaction>::Consensus;
49
50/// Helper type alias to access [`PoolTransaction::Pooled`] for a given [`TransactionPool`].
51pub type PoolPooledTx<P> = <<P as TransactionPool>::Transaction as PoolTransaction>::Pooled;
52
53/// General purpose abstraction of a transaction-pool.
54///
55/// This is intended to be used by API-consumers such as RPC that need inject new incoming,
56/// unverified transactions. And by block production that needs to get transactions to execute in a
57/// new block.
58///
59/// Note: This requires `Clone` for convenience, since it is assumed that this will be implemented
60/// for a wrapped `Arc` type, see also [`Pool`](crate::Pool).
61#[auto_impl::auto_impl(&, Arc)]
62pub trait TransactionPool: Clone + Debug + Send + Sync {
63 /// The transaction type of the pool
64 type Transaction: EthPoolTransaction;
65
66 /// Returns stats about the pool and all sub-pools.
67 fn pool_size(&self) -> PoolSize;
68
69 /// Returns the block the pool is currently tracking.
70 ///
71 /// This tracks the block that the pool has last seen.
72 fn block_info(&self) -> BlockInfo;
73
74 /// Imports an _external_ transaction.
75 ///
76 /// This is intended to be used by the network to insert incoming transactions received over the
77 /// p2p network.
78 ///
79 /// Consumer: P2P
80 fn add_external_transaction(
81 &self,
82 transaction: Self::Transaction,
83 ) -> impl Future<Output = PoolResult<TxHash>> + Send {
84 self.add_transaction(TransactionOrigin::External, transaction)
85 }
86
87 /// Imports all _external_ transactions
88 ///
89 /// Consumer: Utility
90 fn add_external_transactions(
91 &self,
92 transactions: Vec<Self::Transaction>,
93 ) -> impl Future<Output = Vec<PoolResult<TxHash>>> + Send {
94 self.add_transactions(TransactionOrigin::External, transactions)
95 }
96
97 /// Adds an _unvalidated_ transaction into the pool and subscribe to state changes.
98 ///
99 /// This is the same as [`TransactionPool::add_transaction`] but returns an event stream for the
100 /// given transaction.
101 ///
102 /// Consumer: Custom
103 fn add_transaction_and_subscribe(
104 &self,
105 origin: TransactionOrigin,
106 transaction: Self::Transaction,
107 ) -> impl Future<Output = PoolResult<TransactionEvents>> + Send;
108
109 /// Adds an _unvalidated_ transaction into the pool.
110 ///
111 /// Consumer: RPC
112 fn add_transaction(
113 &self,
114 origin: TransactionOrigin,
115 transaction: Self::Transaction,
116 ) -> impl Future<Output = PoolResult<TxHash>> + Send;
117
118 /// Adds the given _unvalidated_ transaction into the pool.
119 ///
120 /// Returns a list of results.
121 ///
122 /// Consumer: RPC
123 fn add_transactions(
124 &self,
125 origin: TransactionOrigin,
126 transactions: Vec<Self::Transaction>,
127 ) -> impl Future<Output = Vec<PoolResult<TxHash>>> + Send;
128
129 /// Returns a new transaction change event stream for the given transaction.
130 ///
131 /// Returns `None` if the transaction is not in the pool.
132 fn transaction_event_listener(&self, tx_hash: TxHash) -> Option<TransactionEvents>;
133
134 /// Returns a new transaction change event stream for _all_ transactions in the pool.
135 fn all_transactions_event_listener(&self) -> AllTransactionsEvents<Self::Transaction>;
136
137 /// Returns a new Stream that yields transactions hashes for new __pending__ transactions
138 /// inserted into the pool that are allowed to be propagated.
139 ///
140 /// Note: This is intended for networking and will __only__ yield transactions that are allowed
141 /// to be propagated over the network, see also [TransactionListenerKind].
142 ///
143 /// Consumer: RPC/P2P
144 fn pending_transactions_listener(&self) -> Receiver<TxHash> {
145 self.pending_transactions_listener_for(TransactionListenerKind::PropagateOnly)
146 }
147
148 /// Returns a new [Receiver] that yields transactions hashes for new __pending__ transactions
149 /// inserted into the pending pool depending on the given [TransactionListenerKind] argument.
150 fn pending_transactions_listener_for(&self, kind: TransactionListenerKind) -> Receiver<TxHash>;
151
152 /// Returns a new stream that yields new valid transactions added to the pool.
153 fn new_transactions_listener(&self) -> Receiver<NewTransactionEvent<Self::Transaction>> {
154 self.new_transactions_listener_for(TransactionListenerKind::PropagateOnly)
155 }
156
157 /// Returns a new [Receiver] that yields blob "sidecars" (blobs w/ assoc. kzg
158 /// commitments/proofs) for eip-4844 transactions inserted into the pool
159 fn blob_transaction_sidecars_listener(&self) -> Receiver<NewBlobSidecar>;
160
161 /// Returns a new stream that yields new valid transactions added to the pool
162 /// depending on the given [TransactionListenerKind] argument.
163 fn new_transactions_listener_for(
164 &self,
165 kind: TransactionListenerKind,
166 ) -> Receiver<NewTransactionEvent<Self::Transaction>>;
167
168 /// Returns a new Stream that yields new transactions added to the pending sub-pool.
169 ///
170 /// This is a convenience wrapper around [Self::new_transactions_listener] that filters for
171 /// [SubPool::Pending](crate::SubPool).
172 fn new_pending_pool_transactions_listener(
173 &self,
174 ) -> NewSubpoolTransactionStream<Self::Transaction> {
175 NewSubpoolTransactionStream::new(
176 self.new_transactions_listener_for(TransactionListenerKind::PropagateOnly),
177 SubPool::Pending,
178 )
179 }
180
181 /// Returns a new Stream that yields new transactions added to the basefee sub-pool.
182 ///
183 /// This is a convenience wrapper around [Self::new_transactions_listener] that filters for
184 /// [SubPool::BaseFee](crate::SubPool).
185 fn new_basefee_pool_transactions_listener(
186 &self,
187 ) -> NewSubpoolTransactionStream<Self::Transaction> {
188 NewSubpoolTransactionStream::new(self.new_transactions_listener(), SubPool::BaseFee)
189 }
190
191 /// Returns a new Stream that yields new transactions added to the queued-pool.
192 ///
193 /// This is a convenience wrapper around [Self::new_transactions_listener] that filters for
194 /// [SubPool::Queued](crate::SubPool).
195 fn new_queued_transactions_listener(&self) -> NewSubpoolTransactionStream<Self::Transaction> {
196 NewSubpoolTransactionStream::new(self.new_transactions_listener(), SubPool::Queued)
197 }
198
199 /// Returns the _hashes_ of all transactions in the pool.
200 ///
201 /// Note: This returns a `Vec` but should guarantee that all hashes are unique.
202 ///
203 /// Consumer: P2P
204 fn pooled_transaction_hashes(&self) -> Vec<TxHash>;
205
206 /// Returns only the first `max` hashes of transactions in the pool.
207 ///
208 /// Consumer: P2P
209 fn pooled_transaction_hashes_max(&self, max: usize) -> Vec<TxHash>;
210
211 /// Returns the _full_ transaction objects all transactions in the pool.
212 ///
213 /// This is intended to be used by the network for the initial exchange of pooled transaction
214 /// _hashes_
215 ///
216 /// Note: This returns a `Vec` but should guarantee that all transactions are unique.
217 ///
218 /// Caution: In case of blob transactions, this does not include the sidecar.
219 ///
220 /// Consumer: P2P
221 fn pooled_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
222
223 /// Returns only the first `max` transactions in the pool.
224 ///
225 /// Consumer: P2P
226 fn pooled_transactions_max(
227 &self,
228 max: usize,
229 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
230
231 /// Returns converted [PooledTransaction] for the given transaction hashes.
232 ///
233 /// This adheres to the expected behavior of
234 /// [`GetPooledTransactions`](https://github.com/ethereum/devp2p/blob/master/caps/eth.md#getpooledtransactions-0x09):
235 ///
236 /// The transactions must be in same order as in the request, but it is OK to skip transactions
237 /// which are not available.
238 ///
239 /// If the transaction is a blob transaction, the sidecar will be included.
240 ///
241 /// Consumer: P2P
242 fn get_pooled_transaction_elements(
243 &self,
244 tx_hashes: Vec<TxHash>,
245 limit: GetPooledTransactionLimit,
246 ) -> Vec<<Self::Transaction as PoolTransaction>::Pooled>;
247
248 /// Returns the pooled transaction variant for the given transaction hash.
249 ///
250 /// This adheres to the expected behavior of
251 /// [`GetPooledTransactions`](https://github.com/ethereum/devp2p/blob/master/caps/eth.md#getpooledtransactions-0x09):
252 ///
253 /// If the transaction is a blob transaction, the sidecar will be included.
254 ///
255 /// It is expected that this variant represents the valid p2p format for full transactions.
256 /// E.g. for EIP-4844 transactions this is the consensus transaction format with the blob
257 /// sidecar.
258 ///
259 /// Consumer: P2P
260 fn get_pooled_transaction_element(
261 &self,
262 tx_hash: TxHash,
263 ) -> Option<Recovered<<Self::Transaction as PoolTransaction>::Pooled>>;
264
265 /// Returns an iterator that yields transactions that are ready for block production.
266 ///
267 /// Consumer: Block production
268 fn best_transactions(
269 &self,
270 ) -> Box<dyn BestTransactions<Item = Arc<ValidPoolTransaction<Self::Transaction>>>>;
271
272 /// Returns an iterator that yields transactions that are ready for block production with the
273 /// given base fee and optional blob fee attributes.
274 ///
275 /// Consumer: Block production
276 fn best_transactions_with_attributes(
277 &self,
278 best_transactions_attributes: BestTransactionsAttributes,
279 ) -> Box<dyn BestTransactions<Item = Arc<ValidPoolTransaction<Self::Transaction>>>>;
280
281 /// Returns all transactions that can be included in the next block.
282 ///
283 /// This is primarily used for the `txpool_` RPC namespace:
284 /// <https://geth.ethereum.org/docs/interacting-with-geth/rpc/ns-txpool> which distinguishes
285 /// between `pending` and `queued` transactions, where `pending` are transactions ready for
286 /// inclusion in the next block and `queued` are transactions that are ready for inclusion in
287 /// future blocks.
288 ///
289 /// Consumer: RPC
290 fn pending_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
291
292 /// Returns first `max` transactions that can be included in the next block.
293 /// See <https://github.com/paradigmxyz/reth/issues/12767#issuecomment-2493223579>
294 ///
295 /// Consumer: Block production
296 fn pending_transactions_max(
297 &self,
298 max: usize,
299 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
300
301 /// Returns all transactions that can be included in _future_ blocks.
302 ///
303 /// This and [Self::pending_transactions] are mutually exclusive.
304 ///
305 /// Consumer: RPC
306 fn queued_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
307
308 /// Returns all transactions that are currently in the pool grouped by whether they are ready
309 /// for inclusion in the next block or not.
310 ///
311 /// This is primarily used for the `txpool_` namespace: <https://geth.ethereum.org/docs/interacting-with-geth/rpc/ns-txpool>
312 ///
313 /// Consumer: RPC
314 fn all_transactions(&self) -> AllPoolTransactions<Self::Transaction>;
315
316 /// Removes all transactions corresponding to the given hashes.
317 ///
318 /// Note: This removes the transactions as if they got discarded (_not_ mined).
319 ///
320 /// Consumer: Utility
321 fn remove_transactions(
322 &self,
323 hashes: Vec<TxHash>,
324 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
325
326 /// Removes all transactions corresponding to the given hashes.
327 ///
328 /// Also removes all _dependent_ transactions.
329 ///
330 /// Consumer: Utility
331 fn remove_transactions_and_descendants(
332 &self,
333 hashes: Vec<TxHash>,
334 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
335
336 /// Removes all transactions from the given sender
337 ///
338 /// Consumer: Utility
339 fn remove_transactions_by_sender(
340 &self,
341 sender: Address,
342 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
343
344 /// Retains only those hashes that are unknown to the pool.
345 /// In other words, removes all transactions from the given set that are currently present in
346 /// the pool. Returns hashes already known to the pool.
347 ///
348 /// Consumer: P2P
349 fn retain_unknown<A>(&self, announcement: &mut A)
350 where
351 A: HandleMempoolData;
352
353 /// Returns if the transaction for the given hash is already included in this pool.
354 fn contains(&self, tx_hash: &TxHash) -> bool {
355 self.get(tx_hash).is_some()
356 }
357
358 /// Returns the transaction for the given hash.
359 fn get(&self, tx_hash: &TxHash) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>>;
360
361 /// Returns all transactions objects for the given hashes.
362 ///
363 /// Caution: This in case of blob transactions, this does not include the sidecar.
364 fn get_all(&self, txs: Vec<TxHash>) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
365
366 /// Notify the pool about transactions that are propagated to peers.
367 ///
368 /// Consumer: P2P
369 fn on_propagated(&self, txs: PropagatedTransactions);
370
371 /// Returns all transactions sent by a given user
372 fn get_transactions_by_sender(
373 &self,
374 sender: Address,
375 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
376
377 /// Returns all pending transactions filtered by predicate
378 fn get_pending_transactions_with_predicate(
379 &self,
380 predicate: impl FnMut(&ValidPoolTransaction<Self::Transaction>) -> bool,
381 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
382
383 /// Returns all pending transactions sent by a given user
384 fn get_pending_transactions_by_sender(
385 &self,
386 sender: Address,
387 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
388
389 /// Returns all queued transactions sent by a given user
390 fn get_queued_transactions_by_sender(
391 &self,
392 sender: Address,
393 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
394
395 /// Returns the highest transaction sent by a given user
396 fn get_highest_transaction_by_sender(
397 &self,
398 sender: Address,
399 ) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>>;
400
401 /// Returns the transaction with the highest nonce that is executable given the on chain nonce.
402 /// In other words the highest non nonce gapped transaction.
403 ///
404 /// Note: The next pending pooled transaction must have the on chain nonce.
405 ///
406 /// For example, for a given on chain nonce of `5`, the next transaction must have that nonce.
407 /// If the pool contains txs `[5,6,7]` this returns tx `7`.
408 /// If the pool contains txs `[6,7]` this returns `None` because the next valid nonce (5) is
409 /// missing, which means txs `[6,7]` are nonce gapped.
410 fn get_highest_consecutive_transaction_by_sender(
411 &self,
412 sender: Address,
413 on_chain_nonce: u64,
414 ) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>>;
415
416 /// Returns a transaction sent by a given user and a nonce
417 fn get_transaction_by_sender_and_nonce(
418 &self,
419 sender: Address,
420 nonce: u64,
421 ) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>>;
422
423 /// Returns all transactions that where submitted with the given [TransactionOrigin]
424 fn get_transactions_by_origin(
425 &self,
426 origin: TransactionOrigin,
427 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
428
429 /// Returns all pending transactions filtered by [`TransactionOrigin`]
430 fn get_pending_transactions_by_origin(
431 &self,
432 origin: TransactionOrigin,
433 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
434
435 /// Returns all transactions that where submitted as [TransactionOrigin::Local]
436 fn get_local_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
437 self.get_transactions_by_origin(TransactionOrigin::Local)
438 }
439
440 /// Returns all transactions that where submitted as [TransactionOrigin::Private]
441 fn get_private_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
442 self.get_transactions_by_origin(TransactionOrigin::Private)
443 }
444
445 /// Returns all transactions that where submitted as [TransactionOrigin::External]
446 fn get_external_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
447 self.get_transactions_by_origin(TransactionOrigin::External)
448 }
449
450 /// Returns all pending transactions that where submitted as [TransactionOrigin::Local]
451 fn get_local_pending_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
452 self.get_pending_transactions_by_origin(TransactionOrigin::Local)
453 }
454
455 /// Returns all pending transactions that where submitted as [TransactionOrigin::Private]
456 fn get_private_pending_transactions(
457 &self,
458 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
459 self.get_pending_transactions_by_origin(TransactionOrigin::Private)
460 }
461
462 /// Returns all pending transactions that where submitted as [TransactionOrigin::External]
463 fn get_external_pending_transactions(
464 &self,
465 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
466 self.get_pending_transactions_by_origin(TransactionOrigin::External)
467 }
468
469 /// Returns a set of all senders of transactions in the pool
470 fn unique_senders(&self) -> HashSet<Address>;
471
472 /// Returns the [BlobTransactionSidecar] for the given transaction hash if it exists in the blob
473 /// store.
474 fn get_blob(
475 &self,
476 tx_hash: TxHash,
477 ) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError>;
478
479 /// Returns all [BlobTransactionSidecar] for the given transaction hashes if they exists in the
480 /// blob store.
481 ///
482 /// This only returns the blobs that were found in the store.
483 /// If there's no blob it will not be returned.
484 fn get_all_blobs(
485 &self,
486 tx_hashes: Vec<TxHash>,
487 ) -> Result<Vec<(TxHash, Arc<BlobTransactionSidecar>)>, BlobStoreError>;
488
489 /// Returns the exact [BlobTransactionSidecar] for the given transaction hashes in the order
490 /// they were requested.
491 ///
492 /// Returns an error if any of the blobs are not found in the blob store.
493 fn get_all_blobs_exact(
494 &self,
495 tx_hashes: Vec<TxHash>,
496 ) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError>;
497
498 /// Return the [`BlobTransactionSidecar`]s for a list of blob versioned hashes.
499 fn get_blobs_for_versioned_hashes(
500 &self,
501 versioned_hashes: &[B256],
502 ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError>;
503}
504
505/// Extension for [TransactionPool] trait that allows to set the current block info.
506#[auto_impl::auto_impl(&, Arc)]
507pub trait TransactionPoolExt: TransactionPool {
508 /// Sets the current block info for the pool.
509 fn set_block_info(&self, info: BlockInfo);
510
511 /// Event listener for when the pool needs to be updated.
512 ///
513 /// Implementers need to update the pool accordingly:
514 ///
515 /// ## Fee changes
516 ///
517 /// The [`CanonicalStateUpdate`] includes the base and blob fee of the pending block, which
518 /// affects the dynamic fee requirement of pending transactions in the pool.
519 ///
520 /// ## EIP-4844 Blob transactions
521 ///
522 /// Mined blob transactions need to be removed from the pool, but from the pool only. The blob
523 /// sidecar must not be removed from the blob store. Only after a blob transaction is
524 /// finalized, its sidecar is removed from the blob store. This ensures that in case of a reorg,
525 /// the sidecar is still available.
526 fn on_canonical_state_change<B>(&self, update: CanonicalStateUpdate<'_, B>)
527 where
528 B: Block;
529
530 /// Updates the accounts in the pool
531 fn update_accounts(&self, accounts: Vec<ChangedAccount>);
532
533 /// Deletes the blob sidecar for the given transaction from the blob store
534 fn delete_blob(&self, tx: B256);
535
536 /// Deletes multiple blob sidecars from the blob store
537 fn delete_blobs(&self, txs: Vec<B256>);
538
539 /// Maintenance function to cleanup blobs that are no longer needed.
540 fn cleanup_blobs(&self);
541}
542
543/// A Helper type that bundles all transactions in the pool.
544#[derive(Debug, Clone)]
545pub struct AllPoolTransactions<T: PoolTransaction> {
546 /// Transactions that are ready for inclusion in the next block.
547 pub pending: Vec<Arc<ValidPoolTransaction<T>>>,
548 /// Transactions that are ready for inclusion in _future_ blocks, but are currently parked,
549 /// because they depend on other transactions that are not yet included in the pool (nonce gap)
550 /// or otherwise blocked.
551 pub queued: Vec<Arc<ValidPoolTransaction<T>>>,
552}
553
554// === impl AllPoolTransactions ===
555
556impl<T: PoolTransaction> AllPoolTransactions<T> {
557 /// Returns an iterator over all pending [`Recovered`] transactions.
558 pub fn pending_recovered(&self) -> impl Iterator<Item = Recovered<T::Consensus>> + '_ {
559 self.pending.iter().map(|tx| tx.transaction.clone().into_consensus())
560 }
561
562 /// Returns an iterator over all queued [`Recovered`] transactions.
563 pub fn queued_recovered(&self) -> impl Iterator<Item = Recovered<T::Consensus>> + '_ {
564 self.queued.iter().map(|tx| tx.transaction.clone().into_consensus())
565 }
566
567 /// Returns an iterator over all transactions, both pending and queued.
568 pub fn all(&self) -> impl Iterator<Item = Recovered<T::Consensus>> + '_ {
569 self.pending
570 .iter()
571 .chain(self.queued.iter())
572 .map(|tx| tx.transaction.clone().into_consensus())
573 }
574}
575
576impl<T: PoolTransaction> Default for AllPoolTransactions<T> {
577 fn default() -> Self {
578 Self { pending: Default::default(), queued: Default::default() }
579 }
580}
581
582/// Represents transactions that were propagated over the network.
583#[derive(Debug, Clone, Eq, PartialEq, Default)]
584pub struct PropagatedTransactions(pub HashMap<TxHash, Vec<PropagateKind>>);
585
586/// Represents how a transaction was propagated over the network.
587#[derive(Debug, Copy, Clone, Eq, PartialEq)]
588#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
589pub enum PropagateKind {
590 /// The full transaction object was sent to the peer.
591 ///
592 /// This is equivalent to the `Transaction` message
593 Full(PeerId),
594 /// Only the Hash was propagated to the peer.
595 Hash(PeerId),
596}
597
598// === impl PropagateKind ===
599
600impl PropagateKind {
601 /// Returns the peer the transaction was sent to
602 pub const fn peer(&self) -> &PeerId {
603 match self {
604 Self::Full(peer) | Self::Hash(peer) => peer,
605 }
606 }
607
608 /// Returns true if the transaction was sent as a full transaction
609 pub const fn is_full(&self) -> bool {
610 matches!(self, Self::Full(_))
611 }
612
613 /// Returns true if the transaction was sent as a hash
614 pub const fn is_hash(&self) -> bool {
615 matches!(self, Self::Hash(_))
616 }
617}
618
619impl From<PropagateKind> for PeerId {
620 fn from(value: PropagateKind) -> Self {
621 match value {
622 PropagateKind::Full(peer) | PropagateKind::Hash(peer) => peer,
623 }
624 }
625}
626
627/// This type represents a new blob sidecar that has been stored in the transaction pool's
628/// blobstore; it includes the `TransactionHash` of the blob transaction along with the assoc.
629/// sidecar (blobs, commitments, proofs)
630#[derive(Debug, Clone)]
631pub struct NewBlobSidecar {
632 /// hash of the EIP-4844 transaction.
633 pub tx_hash: TxHash,
634 /// the blob transaction sidecar.
635 pub sidecar: Arc<BlobTransactionSidecar>,
636}
637
638/// Where the transaction originates from.
639///
640/// Depending on where the transaction was picked up, it affects how the transaction is handled
641/// internally, e.g. limits for simultaneous transaction of one sender.
642#[derive(Debug, Copy, Clone, PartialEq, Eq, Default)]
643pub enum TransactionOrigin {
644 /// Transaction is coming from a local source.
645 #[default]
646 Local,
647 /// Transaction has been received externally.
648 ///
649 /// This is usually considered an "untrusted" source, for example received from another in the
650 /// network.
651 External,
652 /// Transaction is originated locally and is intended to remain private.
653 ///
654 /// This type of transaction should not be propagated to the network. It's meant for
655 /// private usage within the local node only.
656 Private,
657}
658
659// === impl TransactionOrigin ===
660
661impl TransactionOrigin {
662 /// Whether the transaction originates from a local source.
663 pub const fn is_local(&self) -> bool {
664 matches!(self, Self::Local)
665 }
666
667 /// Whether the transaction originates from an external source.
668 pub const fn is_external(&self) -> bool {
669 matches!(self, Self::External)
670 }
671 /// Whether the transaction originates from a private source.
672 pub const fn is_private(&self) -> bool {
673 matches!(self, Self::Private)
674 }
675}
676
677/// Represents the kind of update to the canonical state.
678#[derive(Debug, Clone, Copy, PartialEq, Eq)]
679pub enum PoolUpdateKind {
680 /// The update was due to a block commit.
681 Commit,
682 /// The update was due to a reorganization.
683 Reorg,
684}
685
686/// Represents changes after a new canonical block or range of canonical blocks was added to the
687/// chain.
688///
689/// It is expected that this is only used if the added blocks are canonical to the pool's last known
690/// block hash. In other words, the first added block of the range must be the child of the last
691/// known block hash.
692///
693/// This is used to update the pool state accordingly.
694#[derive(Clone, Debug)]
695pub struct CanonicalStateUpdate<'a, B: Block> {
696 /// Hash of the tip block.
697 pub new_tip: &'a SealedBlock<B>,
698 /// EIP-1559 Base fee of the _next_ (pending) block
699 ///
700 /// The base fee of a block depends on the utilization of the last block and its base fee.
701 pub pending_block_base_fee: u64,
702 /// EIP-4844 blob fee of the _next_ (pending) block
703 ///
704 /// Only after Cancun
705 pub pending_block_blob_fee: Option<u128>,
706 /// A set of changed accounts across a range of blocks.
707 pub changed_accounts: Vec<ChangedAccount>,
708 /// All mined transactions in the block range.
709 pub mined_transactions: Vec<B256>,
710 /// The kind of update to the canonical state.
711 pub update_kind: PoolUpdateKind,
712}
713
714impl<B> CanonicalStateUpdate<'_, B>
715where
716 B: Block,
717{
718 /// Returns the number of the tip block.
719 pub fn number(&self) -> u64 {
720 self.new_tip.number()
721 }
722
723 /// Returns the hash of the tip block.
724 pub fn hash(&self) -> B256 {
725 self.new_tip.hash()
726 }
727
728 /// Timestamp of the latest chain update
729 pub fn timestamp(&self) -> u64 {
730 self.new_tip.timestamp()
731 }
732
733 /// Returns the block info for the tip block.
734 pub fn block_info(&self) -> BlockInfo {
735 BlockInfo {
736 block_gas_limit: self.new_tip.gas_limit(),
737 last_seen_block_hash: self.hash(),
738 last_seen_block_number: self.number(),
739 pending_basefee: self.pending_block_base_fee,
740 pending_blob_fee: self.pending_block_blob_fee,
741 }
742 }
743}
744
745impl<B> fmt::Display for CanonicalStateUpdate<'_, B>
746where
747 B: Block,
748{
749 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
750 f.debug_struct("CanonicalStateUpdate")
751 .field("hash", &self.hash())
752 .field("number", &self.number())
753 .field("pending_block_base_fee", &self.pending_block_base_fee)
754 .field("pending_block_blob_fee", &self.pending_block_blob_fee)
755 .field("changed_accounts", &self.changed_accounts.len())
756 .field("mined_transactions", &self.mined_transactions.len())
757 .finish()
758 }
759}
760
761/// Alias to restrict the [`BestTransactions`] items to the pool's transaction type.
762pub type BestTransactionsFor<Pool> = Box<
763 dyn BestTransactions<Item = Arc<ValidPoolTransaction<<Pool as TransactionPool>::Transaction>>>,
764>;
765
766/// An `Iterator` that only returns transactions that are ready to be executed.
767///
768/// This makes no assumptions about the order of the transactions, but expects that _all_
769/// transactions are valid (no nonce gaps.) for the tracked state of the pool.
770///
771/// Note: this iterator will always return the best transaction that it currently knows.
772/// There is no guarantee transactions will be returned sequentially in decreasing
773/// priority order.
774pub trait BestTransactions: Iterator + Send {
775 /// Mark the transaction as invalid.
776 ///
777 /// Implementers must ensure all subsequent transaction _don't_ depend on this transaction.
778 /// In other words, this must remove the given transaction _and_ drain all transaction that
779 /// depend on it.
780 fn mark_invalid(&mut self, transaction: &Self::Item, kind: InvalidPoolTransactionError);
781
782 /// An iterator may be able to receive additional pending transactions that weren't present it
783 /// the pool when it was created.
784 ///
785 /// This ensures that iterator will return the best transaction that it currently knows and not
786 /// listen to pool updates.
787 fn no_updates(&mut self);
788
789 /// Convenience function for [`Self::no_updates`] that returns the iterator again.
790 fn without_updates(mut self) -> Self
791 where
792 Self: Sized,
793 {
794 self.no_updates();
795 self
796 }
797
798 /// Skip all blob transactions.
799 ///
800 /// There's only limited blob space available in a block, once exhausted, EIP-4844 transactions
801 /// can no longer be included.
802 ///
803 /// If called then the iterator will no longer yield blob transactions.
804 ///
805 /// Note: this will also exclude any transactions that depend on blob transactions.
806 fn skip_blobs(&mut self) {
807 self.set_skip_blobs(true);
808 }
809
810 /// Controls whether the iterator skips blob transactions or not.
811 ///
812 /// If set to true, no blob transactions will be returned.
813 fn set_skip_blobs(&mut self, skip_blobs: bool);
814
815 /// Convenience function for [`Self::skip_blobs`] that returns the iterator again.
816 fn without_blobs(mut self) -> Self
817 where
818 Self: Sized,
819 {
820 self.skip_blobs();
821 self
822 }
823
824 /// Creates an iterator which uses a closure to determine whether a transaction should be
825 /// returned by the iterator.
826 ///
827 /// All items the closure returns false for are marked as invalid via [`Self::mark_invalid`] and
828 /// descendant transactions will be skipped.
829 fn filter_transactions<P>(self, predicate: P) -> BestTransactionFilter<Self, P>
830 where
831 P: FnMut(&Self::Item) -> bool,
832 Self: Sized,
833 {
834 BestTransactionFilter::new(self, predicate)
835 }
836}
837
838impl<T> BestTransactions for Box<T>
839where
840 T: BestTransactions + ?Sized,
841{
842 fn mark_invalid(&mut self, transaction: &Self::Item, kind: InvalidPoolTransactionError) {
843 (**self).mark_invalid(transaction, kind)
844 }
845
846 fn no_updates(&mut self) {
847 (**self).no_updates();
848 }
849
850 fn skip_blobs(&mut self) {
851 (**self).skip_blobs();
852 }
853
854 fn set_skip_blobs(&mut self, skip_blobs: bool) {
855 (**self).set_skip_blobs(skip_blobs);
856 }
857}
858
859/// A no-op implementation that yields no transactions.
860impl<T> BestTransactions for std::iter::Empty<T> {
861 fn mark_invalid(&mut self, _tx: &T, _kind: InvalidPoolTransactionError) {}
862
863 fn no_updates(&mut self) {}
864
865 fn skip_blobs(&mut self) {}
866
867 fn set_skip_blobs(&mut self, _skip_blobs: bool) {}
868}
869
870/// A filter that allows to check if a transaction satisfies a set of conditions
871pub trait TransactionFilter {
872 /// The type of the transaction to check.
873 type Transaction;
874
875 /// Returns true if the transaction satisfies the conditions.
876 fn is_valid(&self, transaction: &Self::Transaction) -> bool;
877}
878
879/// A no-op implementation of [`TransactionFilter`] which
880/// marks all transactions as valid.
881#[derive(Debug, Clone)]
882pub struct NoopTransactionFilter<T>(std::marker::PhantomData<T>);
883
884// We can't derive Default because this forces T to be
885// Default as well, which isn't necessary.
886impl<T> Default for NoopTransactionFilter<T> {
887 fn default() -> Self {
888 Self(std::marker::PhantomData)
889 }
890}
891
892impl<T> TransactionFilter for NoopTransactionFilter<T> {
893 type Transaction = T;
894
895 fn is_valid(&self, _transaction: &Self::Transaction) -> bool {
896 true
897 }
898}
899
900/// A Helper type that bundles the best transactions attributes together.
901#[derive(Debug, Copy, Clone, PartialEq, Eq)]
902pub struct BestTransactionsAttributes {
903 /// The base fee attribute for best transactions.
904 pub basefee: u64,
905 /// The blob fee attribute for best transactions.
906 pub blob_fee: Option<u64>,
907}
908
909// === impl BestTransactionsAttributes ===
910
911impl BestTransactionsAttributes {
912 /// Creates a new `BestTransactionsAttributes` with the given basefee and blob fee.
913 pub const fn new(basefee: u64, blob_fee: Option<u64>) -> Self {
914 Self { basefee, blob_fee }
915 }
916
917 /// Creates a new `BestTransactionsAttributes` with the given basefee.
918 pub const fn base_fee(basefee: u64) -> Self {
919 Self::new(basefee, None)
920 }
921
922 /// Sets the given blob fee.
923 pub const fn with_blob_fee(mut self, blob_fee: u64) -> Self {
924 self.blob_fee = Some(blob_fee);
925 self
926 }
927}
928
929/// Trait for transaction types used inside the pool.
930///
931/// This supports two transaction formats
932/// - Consensus format: the form the transaction takes when it is included in a block.
933/// - Pooled format: the form the transaction takes when it is gossiping around the network.
934///
935/// This distinction is necessary for the EIP-4844 blob transactions, which require an additional
936/// sidecar when they are gossiped around the network. It is expected that the `Consensus` format is
937/// a subset of the `Pooled` format.
938///
939/// The assumption is that fallible conversion from `Consensus` to `Pooled` will encapsulate
940/// handling of all valid `Consensus` transactions that can't be pooled (e.g Deposit transactions or
941/// blob-less EIP-4844 transactions).
942pub trait PoolTransaction:
943 alloy_consensus::Transaction + InMemorySize + Debug + Send + Sync + Clone
944{
945 /// Associated error type for the `try_from_consensus` method.
946 type TryFromConsensusError: fmt::Display;
947
948 /// Associated type representing the raw consensus variant of the transaction.
949 type Consensus: SignedTransaction + From<Self::Pooled>;
950
951 /// Associated type representing the recovered pooled variant of the transaction.
952 type Pooled: TryFrom<Self::Consensus, Error = Self::TryFromConsensusError> + SignedTransaction;
953
954 /// Define a method to convert from the `Consensus` type to `Self`
955 ///
956 /// Note: this _must_ fail on any transactions that cannot be pooled (e.g OP Deposit
957 /// transactions).
958 fn try_from_consensus(
959 tx: Recovered<Self::Consensus>,
960 ) -> Result<Self, Self::TryFromConsensusError> {
961 let (tx, signer) = tx.into_parts();
962 Ok(Self::from_pooled(Recovered::new_unchecked(tx.try_into()?, signer)))
963 }
964
965 /// Clone the transaction into a consensus variant.
966 ///
967 /// This method is preferred when the [`PoolTransaction`] already wraps the consensus variant.
968 fn clone_into_consensus(&self) -> Recovered<Self::Consensus> {
969 self.clone().into_consensus()
970 }
971
972 /// Define a method to convert from the `Self` type to `Consensus`
973 fn into_consensus(self) -> Recovered<Self::Consensus>;
974
975 /// Define a method to convert from the `Pooled` type to `Self`
976 fn from_pooled(pooled: Recovered<Self::Pooled>) -> Self;
977
978 /// Tries to convert the `Consensus` type into the `Pooled` type.
979 fn try_into_pooled(self) -> Result<Recovered<Self::Pooled>, Self::TryFromConsensusError> {
980 let consensus = self.into_consensus();
981 let (tx, signer) = consensus.into_parts();
982 Ok(Recovered::new_unchecked(tx.try_into()?, signer))
983 }
984
985 /// Converts the `Pooled` type into the `Consensus` type.
986 fn pooled_into_consensus(tx: Self::Pooled) -> Self::Consensus {
987 tx.into()
988 }
989
990 /// Hash of the transaction.
991 fn hash(&self) -> &TxHash;
992
993 /// The Sender of the transaction.
994 fn sender(&self) -> Address;
995
996 /// Reference to the Sender of the transaction.
997 fn sender_ref(&self) -> &Address;
998
999 /// Returns the cost that this transaction is allowed to consume:
1000 ///
1001 /// For EIP-1559 transactions: `max_fee_per_gas * gas_limit + tx_value`.
1002 /// For legacy transactions: `gas_price * gas_limit + tx_value`.
1003 /// For EIP-4844 blob transactions: `max_fee_per_gas * gas_limit + tx_value +
1004 /// max_blob_fee_per_gas * blob_gas_used`.
1005 fn cost(&self) -> &U256;
1006
1007 /// Returns the length of the rlp encoded transaction object
1008 ///
1009 /// Note: Implementations should cache this value.
1010 fn encoded_length(&self) -> usize;
1011
1012 /// Ensures that the transaction's code size does not exceed the provided `max_init_code_size`.
1013 ///
1014 /// This is specifically relevant for contract creation transactions ([`TxKind::Create`]),
1015 /// where the input data contains the initialization code. If the input code size exceeds
1016 /// the configured limit, an [`InvalidPoolTransactionError::ExceedsMaxInitCodeSize`] error is
1017 /// returned.
1018 fn ensure_max_init_code_size(
1019 &self,
1020 max_init_code_size: usize,
1021 ) -> Result<(), InvalidPoolTransactionError> {
1022 let input_len = self.input().len();
1023 if self.is_create() && input_len > max_init_code_size {
1024 Err(InvalidPoolTransactionError::ExceedsMaxInitCodeSize(input_len, max_init_code_size))
1025 } else {
1026 Ok(())
1027 }
1028 }
1029}
1030
1031/// Super trait for transactions that can be converted to and from Eth transactions intended for the
1032/// ethereum style pool.
1033///
1034/// This extends the [`PoolTransaction`] trait with additional methods that are specific to the
1035/// Ethereum pool.
1036pub trait EthPoolTransaction: PoolTransaction {
1037 /// Extracts the blob sidecar from the transaction.
1038 fn take_blob(&mut self) -> EthBlobTransactionSidecar;
1039
1040 /// A specialization for the EIP-4844 transaction type.
1041 /// Tries to reattach the blob sidecar to the transaction.
1042 ///
1043 /// This returns an option, but callers should ensure that the transaction is an EIP-4844
1044 /// transaction: [`Typed2718::is_eip4844`].
1045 fn try_into_pooled_eip4844(
1046 self,
1047 sidecar: Arc<BlobTransactionSidecar>,
1048 ) -> Option<Recovered<Self::Pooled>>;
1049
1050 /// Tries to convert the `Consensus` type with a blob sidecar into the `Pooled` type.
1051 ///
1052 /// Returns `None` if passed transaction is not a blob transaction.
1053 fn try_from_eip4844(
1054 tx: Recovered<Self::Consensus>,
1055 sidecar: BlobTransactionSidecar,
1056 ) -> Option<Self>;
1057
1058 /// Validates the blob sidecar of the transaction with the given settings.
1059 fn validate_blob(
1060 &self,
1061 blob: &BlobTransactionSidecar,
1062 settings: &KzgSettings,
1063 ) -> Result<(), BlobTransactionValidationError>;
1064}
1065
1066/// The default [`PoolTransaction`] for the [Pool](crate::Pool) for Ethereum.
1067///
1068/// This type is essentially a wrapper around [`Recovered`] with additional
1069/// fields derived from the transaction that are frequently used by the pools for ordering.
1070#[derive(Debug, Clone, PartialEq, Eq)]
1071pub struct EthPooledTransaction<T = TransactionSigned> {
1072 /// `EcRecovered` transaction, the consensus format.
1073 pub transaction: Recovered<T>,
1074
1075 /// For EIP-1559 transactions: `max_fee_per_gas * gas_limit + tx_value`.
1076 /// For legacy transactions: `gas_price * gas_limit + tx_value`.
1077 /// For EIP-4844 blob transactions: `max_fee_per_gas * gas_limit + tx_value +
1078 /// max_blob_fee_per_gas * blob_gas_used`.
1079 pub cost: U256,
1080
1081 /// This is the RLP length of the transaction, computed when the transaction is added to the
1082 /// pool.
1083 pub encoded_length: usize,
1084
1085 /// The blob side car for this transaction
1086 pub blob_sidecar: EthBlobTransactionSidecar,
1087}
1088
1089impl<T: SignedTransaction> EthPooledTransaction<T> {
1090 /// Create new instance of [Self].
1091 ///
1092 /// Caution: In case of blob transactions, this does marks the blob sidecar as
1093 /// [`EthBlobTransactionSidecar::Missing`]
1094 pub fn new(transaction: Recovered<T>, encoded_length: usize) -> Self {
1095 let mut blob_sidecar = EthBlobTransactionSidecar::None;
1096
1097 let gas_cost = U256::from(transaction.max_fee_per_gas())
1098 .saturating_mul(U256::from(transaction.gas_limit()));
1099
1100 let mut cost = gas_cost.saturating_add(transaction.value());
1101
1102 if let (Some(blob_gas_used), Some(max_fee_per_blob_gas)) =
1103 (transaction.blob_gas_used(), transaction.max_fee_per_blob_gas())
1104 {
1105 // Add max blob cost using saturating math to avoid overflow
1106 cost = cost.saturating_add(U256::from(
1107 max_fee_per_blob_gas.saturating_mul(blob_gas_used as u128),
1108 ));
1109
1110 // because the blob sidecar is not included in this transaction variant, mark it as
1111 // missing
1112 blob_sidecar = EthBlobTransactionSidecar::Missing;
1113 }
1114
1115 Self { transaction, cost, encoded_length, blob_sidecar }
1116 }
1117
1118 /// Return the reference to the underlying transaction.
1119 pub const fn transaction(&self) -> &Recovered<T> {
1120 &self.transaction
1121 }
1122}
1123
1124impl PoolTransaction for EthPooledTransaction {
1125 type TryFromConsensusError = ValueError<TransactionSigned>;
1126
1127 type Consensus = TransactionSigned;
1128
1129 type Pooled = PooledTransaction;
1130
1131 fn clone_into_consensus(&self) -> Recovered<Self::Consensus> {
1132 self.transaction().clone()
1133 }
1134
1135 fn into_consensus(self) -> Recovered<Self::Consensus> {
1136 self.transaction
1137 }
1138
1139 fn from_pooled(tx: Recovered<Self::Pooled>) -> Self {
1140 let encoded_length = tx.encode_2718_len();
1141 let (tx, signer) = tx.into_parts();
1142 match tx {
1143 PooledTransaction::Eip4844(tx) => {
1144 // include the blob sidecar
1145 let (tx, sig, hash) = tx.into_parts();
1146 let (tx, blob) = tx.into_parts();
1147 let tx = Signed::new_unchecked(tx, sig, hash);
1148 let tx = TransactionSigned::from(tx);
1149 let tx = Recovered::new_unchecked(tx, signer);
1150 let mut pooled = Self::new(tx, encoded_length);
1151 pooled.blob_sidecar = EthBlobTransactionSidecar::Present(blob);
1152 pooled
1153 }
1154 tx => {
1155 // no blob sidecar
1156 let tx = Recovered::new_unchecked(tx.into(), signer);
1157 Self::new(tx, encoded_length)
1158 }
1159 }
1160 }
1161
1162 /// Returns hash of the transaction.
1163 fn hash(&self) -> &TxHash {
1164 self.transaction.tx_hash()
1165 }
1166
1167 /// Returns the Sender of the transaction.
1168 fn sender(&self) -> Address {
1169 self.transaction.signer()
1170 }
1171
1172 /// Returns a reference to the Sender of the transaction.
1173 fn sender_ref(&self) -> &Address {
1174 self.transaction.signer_ref()
1175 }
1176
1177 /// Returns the cost that this transaction is allowed to consume:
1178 ///
1179 /// For EIP-1559 transactions: `max_fee_per_gas * gas_limit + tx_value`.
1180 /// For legacy transactions: `gas_price * gas_limit + tx_value`.
1181 /// For EIP-4844 blob transactions: `max_fee_per_gas * gas_limit + tx_value +
1182 /// max_blob_fee_per_gas * blob_gas_used`.
1183 fn cost(&self) -> &U256 {
1184 &self.cost
1185 }
1186
1187 /// Returns the length of the rlp encoded object
1188 fn encoded_length(&self) -> usize {
1189 self.encoded_length
1190 }
1191}
1192
1193impl<T: Typed2718> Typed2718 for EthPooledTransaction<T> {
1194 fn ty(&self) -> u8 {
1195 self.transaction.ty()
1196 }
1197}
1198
1199impl<T: InMemorySize> InMemorySize for EthPooledTransaction<T> {
1200 fn size(&self) -> usize {
1201 self.transaction.size()
1202 }
1203}
1204
1205impl<T: alloy_consensus::Transaction> alloy_consensus::Transaction for EthPooledTransaction<T> {
1206 fn chain_id(&self) -> Option<alloy_primitives::ChainId> {
1207 self.transaction.chain_id()
1208 }
1209
1210 fn nonce(&self) -> u64 {
1211 self.transaction.nonce()
1212 }
1213
1214 fn gas_limit(&self) -> u64 {
1215 self.transaction.gas_limit()
1216 }
1217
1218 fn gas_price(&self) -> Option<u128> {
1219 self.transaction.gas_price()
1220 }
1221
1222 fn max_fee_per_gas(&self) -> u128 {
1223 self.transaction.max_fee_per_gas()
1224 }
1225
1226 fn max_priority_fee_per_gas(&self) -> Option<u128> {
1227 self.transaction.max_priority_fee_per_gas()
1228 }
1229
1230 fn max_fee_per_blob_gas(&self) -> Option<u128> {
1231 self.transaction.max_fee_per_blob_gas()
1232 }
1233
1234 fn priority_fee_or_price(&self) -> u128 {
1235 self.transaction.priority_fee_or_price()
1236 }
1237
1238 fn effective_gas_price(&self, base_fee: Option<u64>) -> u128 {
1239 self.transaction.effective_gas_price(base_fee)
1240 }
1241
1242 fn is_dynamic_fee(&self) -> bool {
1243 self.transaction.is_dynamic_fee()
1244 }
1245
1246 fn kind(&self) -> TxKind {
1247 self.transaction.kind()
1248 }
1249
1250 fn is_create(&self) -> bool {
1251 self.transaction.is_create()
1252 }
1253
1254 fn value(&self) -> U256 {
1255 self.transaction.value()
1256 }
1257
1258 fn input(&self) -> &Bytes {
1259 self.transaction.input()
1260 }
1261
1262 fn access_list(&self) -> Option<&AccessList> {
1263 self.transaction.access_list()
1264 }
1265
1266 fn blob_versioned_hashes(&self) -> Option<&[B256]> {
1267 self.transaction.blob_versioned_hashes()
1268 }
1269
1270 fn authorization_list(&self) -> Option<&[SignedAuthorization]> {
1271 self.transaction.authorization_list()
1272 }
1273}
1274
1275impl EthPoolTransaction for EthPooledTransaction {
1276 fn take_blob(&mut self) -> EthBlobTransactionSidecar {
1277 if self.is_eip4844() {
1278 std::mem::replace(&mut self.blob_sidecar, EthBlobTransactionSidecar::Missing)
1279 } else {
1280 EthBlobTransactionSidecar::None
1281 }
1282 }
1283
1284 fn try_into_pooled_eip4844(
1285 self,
1286 sidecar: Arc<BlobTransactionSidecar>,
1287 ) -> Option<Recovered<Self::Pooled>> {
1288 let (signed_transaction, signer) = self.into_consensus().into_parts();
1289 let pooled_transaction =
1290 signed_transaction.try_into_pooled_eip4844(Arc::unwrap_or_clone(sidecar)).ok()?;
1291
1292 Some(Recovered::new_unchecked(pooled_transaction, signer))
1293 }
1294
1295 fn try_from_eip4844(
1296 tx: Recovered<Self::Consensus>,
1297 sidecar: BlobTransactionSidecar,
1298 ) -> Option<Self> {
1299 let (tx, signer) = tx.into_parts();
1300 tx.try_into_pooled_eip4844(sidecar)
1301 .ok()
1302 .map(|tx| tx.with_signer(signer))
1303 .map(Self::from_pooled)
1304 }
1305
1306 fn validate_blob(
1307 &self,
1308 sidecar: &BlobTransactionSidecar,
1309 settings: &KzgSettings,
1310 ) -> Result<(), BlobTransactionValidationError> {
1311 match self.transaction.inner().as_eip4844() {
1312 Some(tx) => tx.tx().validate_blob(sidecar, settings),
1313 _ => Err(BlobTransactionValidationError::NotBlobTransaction(self.ty())),
1314 }
1315 }
1316}
1317
1318/// Represents the blob sidecar of the [`EthPooledTransaction`].
1319#[derive(Debug, Clone, PartialEq, Eq)]
1320pub enum EthBlobTransactionSidecar {
1321 /// This transaction does not have a blob sidecar
1322 None,
1323 /// This transaction has a blob sidecar (EIP-4844) but it is missing
1324 ///
1325 /// It was either extracted after being inserted into the pool or re-injected after reorg
1326 /// without the blob sidecar
1327 Missing,
1328 /// The eip-4844 transaction was pulled from the network and still has its blob sidecar
1329 Present(BlobTransactionSidecar),
1330}
1331
1332impl EthBlobTransactionSidecar {
1333 /// Returns the blob sidecar if it is present
1334 pub const fn maybe_sidecar(&self) -> Option<&BlobTransactionSidecar> {
1335 match self {
1336 Self::Present(sidecar) => Some(sidecar),
1337 _ => None,
1338 }
1339 }
1340}
1341
1342/// Represents the current status of the pool.
1343#[derive(Debug, Clone, Copy, Default)]
1344pub struct PoolSize {
1345 /// Number of transactions in the _pending_ sub-pool.
1346 pub pending: usize,
1347 /// Reported size of transactions in the _pending_ sub-pool.
1348 pub pending_size: usize,
1349 /// Number of transactions in the _blob_ pool.
1350 pub blob: usize,
1351 /// Reported size of transactions in the _blob_ pool.
1352 pub blob_size: usize,
1353 /// Number of transactions in the _basefee_ pool.
1354 pub basefee: usize,
1355 /// Reported size of transactions in the _basefee_ sub-pool.
1356 pub basefee_size: usize,
1357 /// Number of transactions in the _queued_ sub-pool.
1358 pub queued: usize,
1359 /// Reported size of transactions in the _queued_ sub-pool.
1360 pub queued_size: usize,
1361 /// Number of all transactions of all sub-pools
1362 ///
1363 /// Note: this is the sum of ```pending + basefee + queued```
1364 pub total: usize,
1365}
1366
1367// === impl PoolSize ===
1368
1369impl PoolSize {
1370 /// Asserts that the invariants of the pool size are met.
1371 #[cfg(test)]
1372 pub(crate) fn assert_invariants(&self) {
1373 assert_eq!(self.total, self.pending + self.basefee + self.queued + self.blob);
1374 }
1375}
1376
1377/// Represents the current status of the pool.
1378#[derive(Default, Debug, Clone, Copy, Eq, PartialEq)]
1379pub struct BlockInfo {
1380 /// Hash for the currently tracked block.
1381 pub last_seen_block_hash: B256,
1382 /// Currently tracked block.
1383 pub last_seen_block_number: u64,
1384 /// Current block gas limit for the latest block.
1385 pub block_gas_limit: u64,
1386 /// Currently enforced base fee: the threshold for the basefee sub-pool.
1387 ///
1388 /// Note: this is the derived base fee of the _next_ block that builds on the block the pool is
1389 /// currently tracking.
1390 pub pending_basefee: u64,
1391 /// Currently enforced blob fee: the threshold for eip-4844 blob transactions.
1392 ///
1393 /// Note: this is the derived blob fee of the _next_ block that builds on the block the pool is
1394 /// currently tracking
1395 pub pending_blob_fee: Option<u128>,
1396}
1397
1398/// The limit to enforce for [`TransactionPool::get_pooled_transaction_elements`].
1399#[derive(Debug, Clone, Copy, Eq, PartialEq)]
1400pub enum GetPooledTransactionLimit {
1401 /// No limit, return all transactions.
1402 None,
1403 /// Enforce a size limit on the returned transactions, for example 2MB
1404 ResponseSizeSoftLimit(usize),
1405}
1406
1407impl GetPooledTransactionLimit {
1408 /// Returns true if the given size exceeds the limit.
1409 #[inline]
1410 pub const fn exceeds(&self, size: usize) -> bool {
1411 match self {
1412 Self::None => false,
1413 Self::ResponseSizeSoftLimit(limit) => size > *limit,
1414 }
1415 }
1416}
1417
1418/// A Stream that yields full transactions the subpool
1419#[must_use = "streams do nothing unless polled"]
1420#[derive(Debug)]
1421pub struct NewSubpoolTransactionStream<Tx: PoolTransaction> {
1422 st: Receiver<NewTransactionEvent<Tx>>,
1423 subpool: SubPool,
1424}
1425
1426// === impl NewSubpoolTransactionStream ===
1427
1428impl<Tx: PoolTransaction> NewSubpoolTransactionStream<Tx> {
1429 /// Create a new stream that yields full transactions from the subpool
1430 pub const fn new(st: Receiver<NewTransactionEvent<Tx>>, subpool: SubPool) -> Self {
1431 Self { st, subpool }
1432 }
1433
1434 /// Tries to receive the next value for this stream.
1435 pub fn try_recv(
1436 &mut self,
1437 ) -> Result<NewTransactionEvent<Tx>, tokio::sync::mpsc::error::TryRecvError> {
1438 loop {
1439 match self.st.try_recv() {
1440 Ok(event) => {
1441 if event.subpool == self.subpool {
1442 return Ok(event)
1443 }
1444 }
1445 Err(e) => return Err(e),
1446 }
1447 }
1448 }
1449}
1450
1451impl<Tx: PoolTransaction> Stream for NewSubpoolTransactionStream<Tx> {
1452 type Item = NewTransactionEvent<Tx>;
1453
1454 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1455 loop {
1456 match ready!(self.st.poll_recv(cx)) {
1457 Some(event) => {
1458 if event.subpool == self.subpool {
1459 return Poll::Ready(Some(event))
1460 }
1461 }
1462 None => return Poll::Ready(None),
1463 }
1464 }
1465 }
1466}
1467
1468#[cfg(test)]
1469mod tests {
1470 use super::*;
1471 use alloy_consensus::{
1472 EthereumTxEnvelope, SignableTransaction, TxEip1559, TxEip2930, TxEip4844, TxEip7702,
1473 TxEnvelope, TxLegacy,
1474 };
1475 use alloy_eips::eip4844::DATA_GAS_PER_BLOB;
1476 use alloy_primitives::Signature;
1477
1478 #[test]
1479 fn test_pool_size_invariants() {
1480 let pool_size = PoolSize {
1481 pending: 10,
1482 pending_size: 1000,
1483 blob: 5,
1484 blob_size: 500,
1485 basefee: 8,
1486 basefee_size: 800,
1487 queued: 7,
1488 queued_size: 700,
1489 total: 10 + 5 + 8 + 7, // Correct total
1490 };
1491
1492 // Call the assert_invariants method to check if the invariants are correct
1493 pool_size.assert_invariants();
1494 }
1495
1496 #[test]
1497 #[should_panic]
1498 fn test_pool_size_invariants_fail() {
1499 let pool_size = PoolSize {
1500 pending: 10,
1501 pending_size: 1000,
1502 blob: 5,
1503 blob_size: 500,
1504 basefee: 8,
1505 basefee_size: 800,
1506 queued: 7,
1507 queued_size: 700,
1508 total: 10 + 5 + 8, // Incorrect total
1509 };
1510
1511 // Call the assert_invariants method, which should panic
1512 pool_size.assert_invariants();
1513 }
1514
1515 #[test]
1516 fn test_eth_pooled_transaction_new_legacy() {
1517 // Create a legacy transaction with specific parameters
1518 let tx = TxEnvelope::Legacy(
1519 TxLegacy {
1520 gas_price: 10,
1521 gas_limit: 1000,
1522 value: U256::from(100),
1523 ..Default::default()
1524 }
1525 .into_signed(Signature::test_signature()),
1526 );
1527 let transaction = Recovered::new_unchecked(tx, Default::default());
1528 let pooled_tx = EthPooledTransaction::new(transaction.clone(), 200);
1529
1530 // Check that the pooled transaction is created correctly
1531 assert_eq!(pooled_tx.transaction, transaction);
1532 assert_eq!(pooled_tx.encoded_length, 200);
1533 assert_eq!(pooled_tx.blob_sidecar, EthBlobTransactionSidecar::None);
1534 assert_eq!(pooled_tx.cost, U256::from(100) + U256::from(10 * 1000));
1535 }
1536
1537 #[test]
1538 fn test_eth_pooled_transaction_new_eip2930() {
1539 // Create an EIP-2930 transaction with specific parameters
1540 let tx = TxEnvelope::Eip2930(
1541 TxEip2930 {
1542 gas_price: 10,
1543 gas_limit: 1000,
1544 value: U256::from(100),
1545 ..Default::default()
1546 }
1547 .into_signed(Signature::test_signature()),
1548 );
1549 let transaction = Recovered::new_unchecked(tx, Default::default());
1550 let pooled_tx = EthPooledTransaction::new(transaction.clone(), 200);
1551 let expected_cost = U256::from(100) + (U256::from(10 * 1000));
1552
1553 assert_eq!(pooled_tx.transaction, transaction);
1554 assert_eq!(pooled_tx.encoded_length, 200);
1555 assert_eq!(pooled_tx.blob_sidecar, EthBlobTransactionSidecar::None);
1556 assert_eq!(pooled_tx.cost, expected_cost);
1557 }
1558
1559 #[test]
1560 fn test_eth_pooled_transaction_new_eip1559() {
1561 // Create an EIP-1559 transaction with specific parameters
1562 let tx = TxEnvelope::Eip1559(
1563 TxEip1559 {
1564 max_fee_per_gas: 10,
1565 gas_limit: 1000,
1566 value: U256::from(100),
1567 ..Default::default()
1568 }
1569 .into_signed(Signature::test_signature()),
1570 );
1571 let transaction = Recovered::new_unchecked(tx, Default::default());
1572 let pooled_tx = EthPooledTransaction::new(transaction.clone(), 200);
1573
1574 // Check that the pooled transaction is created correctly
1575 assert_eq!(pooled_tx.transaction, transaction);
1576 assert_eq!(pooled_tx.encoded_length, 200);
1577 assert_eq!(pooled_tx.blob_sidecar, EthBlobTransactionSidecar::None);
1578 assert_eq!(pooled_tx.cost, U256::from(100) + U256::from(10 * 1000));
1579 }
1580
1581 #[test]
1582 fn test_eth_pooled_transaction_new_eip4844() {
1583 // Create an EIP-4844 transaction with specific parameters
1584 let tx = EthereumTxEnvelope::Eip4844(
1585 TxEip4844 {
1586 max_fee_per_gas: 10,
1587 gas_limit: 1000,
1588 value: U256::from(100),
1589 max_fee_per_blob_gas: 5,
1590 blob_versioned_hashes: vec![B256::default()],
1591 ..Default::default()
1592 }
1593 .into_signed(Signature::test_signature()),
1594 );
1595 let transaction = Recovered::new_unchecked(tx, Default::default());
1596 let pooled_tx = EthPooledTransaction::new(transaction.clone(), 300);
1597
1598 // Check that the pooled transaction is created correctly
1599 assert_eq!(pooled_tx.transaction, transaction);
1600 assert_eq!(pooled_tx.encoded_length, 300);
1601 assert_eq!(pooled_tx.blob_sidecar, EthBlobTransactionSidecar::Missing);
1602 let expected_cost =
1603 U256::from(100) + U256::from(10 * 1000) + U256::from(5 * DATA_GAS_PER_BLOB);
1604 assert_eq!(pooled_tx.cost, expected_cost);
1605 }
1606
1607 #[test]
1608 fn test_eth_pooled_transaction_new_eip7702() {
1609 // Init an EIP-7702 transaction with specific parameters
1610 let tx = EthereumTxEnvelope::<TxEip4844>::Eip7702(
1611 TxEip7702 {
1612 max_fee_per_gas: 10,
1613 gas_limit: 1000,
1614 value: U256::from(100),
1615 ..Default::default()
1616 }
1617 .into_signed(Signature::test_signature()),
1618 );
1619 let transaction = Recovered::new_unchecked(tx, Default::default());
1620 let pooled_tx = EthPooledTransaction::new(transaction.clone(), 200);
1621
1622 // Check that the pooled transaction is created correctly
1623 assert_eq!(pooled_tx.transaction, transaction);
1624 assert_eq!(pooled_tx.encoded_length, 200);
1625 assert_eq!(pooled_tx.blob_sidecar, EthBlobTransactionSidecar::None);
1626 assert_eq!(pooled_tx.cost, U256::from(100) + U256::from(10 * 1000));
1627 }
1628
1629 #[test]
1630 fn test_pooled_transaction_limit() {
1631 // No limit should never exceed
1632 let limit_none = GetPooledTransactionLimit::None;
1633 // Any size should return false
1634 assert!(!limit_none.exceeds(1000));
1635
1636 // Size limit of 2MB (2 * 1024 * 1024 bytes)
1637 let size_limit_2mb = GetPooledTransactionLimit::ResponseSizeSoftLimit(2 * 1024 * 1024);
1638
1639 // Test with size below the limit
1640 // 1MB is below 2MB, should return false
1641 assert!(!size_limit_2mb.exceeds(1024 * 1024));
1642
1643 // Test with size exactly at the limit
1644 // 2MB equals the limit, should return false
1645 assert!(!size_limit_2mb.exceeds(2 * 1024 * 1024));
1646
1647 // Test with size exceeding the limit
1648 // 3MB is above the 2MB limit, should return true
1649 assert!(size_limit_2mb.exceeds(3 * 1024 * 1024));
1650 }
1651}