reth_optimism_txpool/
maintain.rs

1//! Support for maintaining the state of the transaction pool
2
3use crate::conditional::MaybeConditionalTransaction;
4use alloy_consensus::{conditional::BlockConditionalAttributes, BlockHeader};
5use futures_util::{future::BoxFuture, FutureExt, Stream, StreamExt};
6use reth_chain_state::CanonStateNotification;
7use reth_metrics::{metrics::Counter, Metrics};
8use reth_primitives_traits::NodePrimitives;
9use reth_transaction_pool::TransactionPool;
10
11/// Transaction pool maintenance metrics
12#[derive(Metrics)]
13#[metrics(scope = "transaction_pool")]
14struct MaintainPoolMetrics {
15    /// Counter indicating the number of conditional transactions removed from
16    /// the pool because of exceeded block attributes.
17    removed_tx_conditional: Counter,
18}
19
20impl MaintainPoolMetrics {
21    #[inline]
22    fn inc_removed_tx_conditional(&self, count: usize) {
23        self.removed_tx_conditional.increment(count as u64);
24    }
25}
26
27/// Returns a spawnable future for maintaining the state of the transaction pool.
28pub fn maintain_transaction_pool_future<N, Pool, St>(
29    pool: Pool,
30    events: St,
31) -> BoxFuture<'static, ()>
32where
33    N: NodePrimitives,
34    Pool: TransactionPool + 'static,
35    Pool::Transaction: MaybeConditionalTransaction,
36    St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
37{
38    async move {
39        maintain_transaction_pool(pool, events).await;
40    }
41    .boxed()
42}
43
44/// Maintains the state of the transaction pool by handling new blocks and reorgs.
45///
46/// This listens for any new blocks and reorgs and updates the transaction pool's state accordingly
47pub async fn maintain_transaction_pool<N, Pool, St>(pool: Pool, mut events: St)
48where
49    N: NodePrimitives,
50    Pool: TransactionPool,
51    Pool::Transaction: MaybeConditionalTransaction,
52    St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
53{
54    let metrics = MaintainPoolMetrics::default();
55    loop {
56        let Some(event) = events.next().await else { break };
57        if let CanonStateNotification::Commit { new } = event {
58            if new.is_empty() {
59                continue;
60            }
61            let block_attr = BlockConditionalAttributes {
62                number: new.tip().number(),
63                timestamp: new.tip().timestamp(),
64            };
65            let mut to_remove = Vec::new();
66            for tx in &pool.pooled_transactions() {
67                if tx.transaction.has_exceeded_block_attributes(&block_attr) {
68                    to_remove.push(*tx.hash());
69                }
70            }
71            if !to_remove.is_empty() {
72                metrics.inc_removed_tx_conditional(to_remove.len());
73                let _ = pool.remove_transactions(to_remove);
74            }
75        }
76    }
77}