reth_optimism_txpool/
maintain.rs

1//! Support for maintaining the state of the transaction pool
2
3/// The interval for which we check transaction against supervisor, 10 min.
4const TRANSACTION_VALIDITY_WINDOW: u64 = 600;
5/// Interval in seconds at which the transaction should be revalidated.
6const OFFSET_TIME: u64 = 60;
7/// Maximum number of supervisor requests at the same time
8const MAX_SUPERVISOR_QUERIES: usize = 10;
9
10use crate::{
11    conditional::MaybeConditionalTransaction,
12    interop::{is_stale_interop, is_valid_interop, MaybeInteropTransaction},
13    supervisor::SupervisorClient,
14};
15use alloy_consensus::{conditional::BlockConditionalAttributes, BlockHeader, Transaction};
16use futures_util::{future::BoxFuture, FutureExt, Stream, StreamExt};
17use metrics::Gauge;
18use reth_chain_state::CanonStateNotification;
19use reth_metrics::{metrics::Counter, Metrics};
20use reth_primitives_traits::NodePrimitives;
21use reth_transaction_pool::{error::PoolTransactionError, PoolTransaction, TransactionPool};
22use std::sync::Arc;
23
24/// Transaction pool maintenance metrics
25#[derive(Metrics)]
26#[metrics(scope = "transaction_pool")]
27struct MaintainPoolConditionalMetrics {
28    /// Counter indicating the number of conditional transactions removed from
29    /// the pool because of exceeded block attributes.
30    removed_tx_conditional: Counter,
31}
32
33impl MaintainPoolConditionalMetrics {
34    #[inline]
35    fn inc_removed_tx_conditional(&self, count: usize) {
36        self.removed_tx_conditional.increment(count as u64);
37    }
38}
39
40/// Transaction pool maintenance metrics
41#[derive(Metrics)]
42#[metrics(scope = "transaction_pool")]
43struct MaintainPoolInteropMetrics {
44    /// Counter indicating the number of conditional transactions removed from
45    /// the pool because of exceeded block attributes.
46    removed_tx_interop: Counter,
47    /// Number of interop transactions currently in the pool
48    pooled_interop_transactions: Gauge,
49
50    /// Counter for interop transactions that became stale and need revalidation
51    stale_interop_transactions: Counter,
52    // TODO: we also should add metric for (hash, counter) to check number of validation per tx
53    // TODO: we should add some timing metric in here to check supervisor congestion
54}
55
56impl MaintainPoolInteropMetrics {
57    #[inline]
58    fn inc_removed_tx_interop(&self, count: usize) {
59        self.removed_tx_interop.increment(count as u64);
60    }
61    #[inline]
62    fn set_interop_txs_in_pool(&self, count: usize) {
63        self.pooled_interop_transactions.set(count as f64);
64    }
65
66    #[inline]
67    fn inc_stale_tx_interop(&self, count: usize) {
68        self.stale_interop_transactions.increment(count as u64);
69    }
70}
71/// Returns a spawnable future for maintaining the state of the conditional txs in the transaction
72/// pool.
73pub fn maintain_transaction_pool_conditional_future<N, Pool, St>(
74    pool: Pool,
75    events: St,
76) -> BoxFuture<'static, ()>
77where
78    N: NodePrimitives,
79    Pool: TransactionPool + 'static,
80    Pool::Transaction: MaybeConditionalTransaction,
81    St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
82{
83    async move {
84        maintain_transaction_pool_conditional(pool, events).await;
85    }
86    .boxed()
87}
88
89/// Maintains the state of the conditional tx in the transaction pool by handling new blocks and
90/// reorgs.
91///
92/// This listens for any new blocks and reorgs and updates the conditional txs in the
93/// transaction pool's state accordingly
94pub async fn maintain_transaction_pool_conditional<N, Pool, St>(pool: Pool, mut events: St)
95where
96    N: NodePrimitives,
97    Pool: TransactionPool,
98    Pool::Transaction: MaybeConditionalTransaction,
99    St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
100{
101    let metrics = MaintainPoolConditionalMetrics::default();
102    loop {
103        let Some(event) = events.next().await else { break };
104        if let CanonStateNotification::Commit { new } = event {
105            let block_attr = BlockConditionalAttributes {
106                number: new.tip().number(),
107                timestamp: new.tip().timestamp(),
108            };
109            let mut to_remove = Vec::new();
110            for tx in &pool.pooled_transactions() {
111                if tx.transaction.has_exceeded_block_attributes(&block_attr) {
112                    to_remove.push(*tx.hash());
113                }
114            }
115            if !to_remove.is_empty() {
116                let removed = pool.remove_transactions(to_remove);
117                metrics.inc_removed_tx_conditional(removed.len());
118            }
119        }
120    }
121}
122
123/// Returns a spawnable future for maintaining the state of the interop tx in the transaction pool.
124pub fn maintain_transaction_pool_interop_future<N, Pool, St>(
125    pool: Pool,
126    events: St,
127    supervisor_client: SupervisorClient,
128) -> BoxFuture<'static, ()>
129where
130    N: NodePrimitives,
131    Pool: TransactionPool + 'static,
132    Pool::Transaction: MaybeInteropTransaction,
133    St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
134{
135    async move {
136        maintain_transaction_pool_interop(pool, events, supervisor_client).await;
137    }
138    .boxed()
139}
140
141/// Maintains the state of the interop tx in the transaction pool by handling new blocks and reorgs.
142///
143/// This listens for any new blocks and reorgs and updates the interop tx in the transaction pool's
144/// state accordingly
145pub async fn maintain_transaction_pool_interop<N, Pool, St>(
146    pool: Pool,
147    mut events: St,
148    supervisor_client: SupervisorClient,
149) where
150    N: NodePrimitives,
151    Pool: TransactionPool,
152    Pool::Transaction: MaybeInteropTransaction,
153    St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
154{
155    let metrics = MaintainPoolInteropMetrics::default();
156    let supervisor_client = Arc::new(supervisor_client);
157    loop {
158        let Some(event) = events.next().await else { break };
159        if let CanonStateNotification::Commit { new } = event {
160            let timestamp = new.tip().timestamp();
161            let mut to_remove = Vec::new();
162            let mut to_revalidate = Vec::new();
163            let mut interop_count = 0;
164            for tx in &pool.pooled_transactions() {
165                // Only interop txs have this field set
166                if let Some(interop) = tx.transaction.interop_deadline() {
167                    interop_count += 1;
168                    if !is_valid_interop(interop, timestamp) {
169                        // That means tx didn't revalidated during [`OFFSET_TIME`] time
170                        // We could assume that it won't be validated at all and remove it
171                        to_remove.push(*tx.hash());
172                    } else if is_stale_interop(interop, timestamp, OFFSET_TIME) {
173                        // If tx has less then [`OFFSET_TIME`] of valid time we revalidate it
174                        to_revalidate.push(tx.clone())
175                    }
176                }
177            }
178
179            metrics.set_interop_txs_in_pool(interop_count);
180            if !to_revalidate.is_empty() {
181                metrics.inc_stale_tx_interop(to_revalidate.len());
182                let checks_stream =
183                    futures_util::stream::iter(to_revalidate.into_iter().map(|tx| {
184                        let supervisor_client = supervisor_client.clone();
185                        async move {
186                            let check = supervisor_client
187                                .is_valid_cross_tx(
188                                    tx.transaction.access_list(),
189                                    tx.transaction.hash(),
190                                    timestamp,
191                                    Some(TRANSACTION_VALIDITY_WINDOW),
192                                    // We could assume that interop is enabled, because
193                                    // tx.transaction.interop() would be set only in
194                                    // this case
195                                    true,
196                                )
197                                .await;
198                            (tx.clone(), check)
199                        }
200                    }))
201                    .buffered(MAX_SUPERVISOR_QUERIES);
202                futures_util::pin_mut!(checks_stream);
203                while let Some((tx, check)) = checks_stream.next().await {
204                    if let Some(Err(err)) = check {
205                        // We remove only bad transaction. If error caused by supervisor instability
206                        // or other fixable issues transaction would be validated on next state
207                        // change, so we ignore it
208                        if err.is_bad_transaction() {
209                            to_remove.push(*tx.transaction.hash());
210                        }
211                    } else {
212                        tx.transaction.set_interop_deadline(timestamp + TRANSACTION_VALIDITY_WINDOW)
213                    }
214                }
215            }
216            if !to_remove.is_empty() {
217                let removed = pool.remove_transactions(to_remove);
218                metrics.inc_removed_tx_interop(removed.len());
219            }
220        }
221    }
222}