reth_optimism_txpool/
maintain.rs
1use crate::conditional::MaybeConditionalTransaction;
4use alloy_consensus::{conditional::BlockConditionalAttributes, BlockHeader};
5use futures_util::{future::BoxFuture, FutureExt, Stream, StreamExt};
6use reth_chain_state::CanonStateNotification;
7use reth_metrics::{metrics::Counter, Metrics};
8use reth_primitives_traits::NodePrimitives;
9use reth_transaction_pool::TransactionPool;
10
11#[derive(Metrics)]
13#[metrics(scope = "transaction_pool")]
14struct MaintainPoolMetrics {
15 removed_tx_conditional: Counter,
18}
19
20impl MaintainPoolMetrics {
21 #[inline]
22 fn inc_removed_tx_conditional(&self, count: usize) {
23 self.removed_tx_conditional.increment(count as u64);
24 }
25}
26
27pub fn maintain_transaction_pool_future<N, Pool, St>(
29 pool: Pool,
30 events: St,
31) -> BoxFuture<'static, ()>
32where
33 N: NodePrimitives,
34 Pool: TransactionPool + 'static,
35 Pool::Transaction: MaybeConditionalTransaction,
36 St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
37{
38 async move {
39 maintain_transaction_pool(pool, events).await;
40 }
41 .boxed()
42}
43
44pub async fn maintain_transaction_pool<N, Pool, St>(pool: Pool, mut events: St)
48where
49 N: NodePrimitives,
50 Pool: TransactionPool,
51 Pool::Transaction: MaybeConditionalTransaction,
52 St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
53{
54 let metrics = MaintainPoolMetrics::default();
55 loop {
56 let Some(event) = events.next().await else { break };
57 if let CanonStateNotification::Commit { new } = event {
58 if new.is_empty() {
59 continue;
60 }
61 let block_attr = BlockConditionalAttributes {
62 number: new.tip().number(),
63 timestamp: new.tip().timestamp(),
64 };
65 let mut to_remove = Vec::new();
66 for tx in &pool.pooled_transactions() {
67 if tx.transaction.has_exceeded_block_attributes(&block_attr) {
68 to_remove.push(*tx.hash());
69 }
70 }
71 if !to_remove.is_empty() {
72 metrics.inc_removed_tx_conditional(to_remove.len());
73 let _ = pool.remove_transactions(to_remove);
74 }
75 }
76 }
77}