reth_optimism_txpool/
maintain.rs

1//! Support for maintaining the state of the transaction pool
2
3/// The interval for which we check transaction against supervisor, 10 min.
4const TRANSACTION_VALIDITY_WINDOW: u64 = 600;
5/// Interval in seconds at which the transaction should be revalidated.
6const OFFSET_TIME: u64 = 60;
7/// Maximum number of supervisor requests at the same time
8const MAX_SUPERVISOR_QUERIES: usize = 10;
9
10use crate::{
11    conditional::MaybeConditionalTransaction,
12    interop::{is_stale_interop, is_valid_interop, MaybeInteropTransaction},
13    supervisor::SupervisorClient,
14};
15use alloy_consensus::{conditional::BlockConditionalAttributes, BlockHeader};
16use futures_util::{future::BoxFuture, FutureExt, Stream, StreamExt};
17use metrics::{Gauge, Histogram};
18use reth_chain_state::CanonStateNotification;
19use reth_metrics::{metrics::Counter, Metrics};
20use reth_primitives_traits::NodePrimitives;
21use reth_transaction_pool::{error::PoolTransactionError, PoolTransaction, TransactionPool};
22use std::time::Instant;
23use tracing::warn;
24
25/// Transaction pool maintenance metrics
26#[derive(Metrics)]
27#[metrics(scope = "transaction_pool")]
28struct MaintainPoolConditionalMetrics {
29    /// Counter indicating the number of conditional transactions removed from
30    /// the pool because of exceeded block attributes.
31    removed_tx_conditional: Counter,
32}
33
34impl MaintainPoolConditionalMetrics {
35    #[inline]
36    fn inc_removed_tx_conditional(&self, count: usize) {
37        self.removed_tx_conditional.increment(count as u64);
38    }
39}
40
41/// Transaction pool maintenance metrics
42#[derive(Metrics)]
43#[metrics(scope = "transaction_pool")]
44struct MaintainPoolInteropMetrics {
45    /// Counter indicating the number of conditional transactions removed from
46    /// the pool because of exceeded block attributes.
47    removed_tx_interop: Counter,
48    /// Number of interop transactions currently in the pool
49    pooled_interop_transactions: Gauge,
50
51    /// Counter for interop transactions that became stale and need revalidation
52    stale_interop_transactions: Counter,
53    // TODO: we also should add metric for (hash, counter) to check number of validation per tx
54    /// Histogram for measuring supervisor revalidation duration (congestion metric)
55    supervisor_revalidation_duration_seconds: Histogram,
56}
57
58impl MaintainPoolInteropMetrics {
59    #[inline]
60    fn inc_removed_tx_interop(&self, count: usize) {
61        self.removed_tx_interop.increment(count as u64);
62    }
63    #[inline]
64    fn set_interop_txs_in_pool(&self, count: usize) {
65        self.pooled_interop_transactions.set(count as f64);
66    }
67
68    #[inline]
69    fn inc_stale_tx_interop(&self, count: usize) {
70        self.stale_interop_transactions.increment(count as u64);
71    }
72
73    /// Record supervisor revalidation duration
74    #[inline]
75    fn record_supervisor_duration(&self, duration: std::time::Duration) {
76        self.supervisor_revalidation_duration_seconds.record(duration.as_secs_f64());
77    }
78}
79/// Returns a spawnable future for maintaining the state of the conditional txs in the transaction
80/// pool.
81pub fn maintain_transaction_pool_conditional_future<N, Pool, St>(
82    pool: Pool,
83    events: St,
84) -> BoxFuture<'static, ()>
85where
86    N: NodePrimitives,
87    Pool: TransactionPool + 'static,
88    Pool::Transaction: MaybeConditionalTransaction,
89    St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
90{
91    async move {
92        maintain_transaction_pool_conditional(pool, events).await;
93    }
94    .boxed()
95}
96
97/// Maintains the state of the conditional tx in the transaction pool by handling new blocks and
98/// reorgs.
99///
100/// This listens for any new blocks and reorgs and updates the conditional txs in the
101/// transaction pool's state accordingly
102pub async fn maintain_transaction_pool_conditional<N, Pool, St>(pool: Pool, mut events: St)
103where
104    N: NodePrimitives,
105    Pool: TransactionPool,
106    Pool::Transaction: MaybeConditionalTransaction,
107    St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
108{
109    let metrics = MaintainPoolConditionalMetrics::default();
110    loop {
111        let Some(event) = events.next().await else { break };
112        if let CanonStateNotification::Commit { new } = event {
113            let block_attr = BlockConditionalAttributes {
114                number: new.tip().number(),
115                timestamp: new.tip().timestamp(),
116            };
117            let mut to_remove = Vec::new();
118            for tx in &pool.pooled_transactions() {
119                if tx.transaction.has_exceeded_block_attributes(&block_attr) {
120                    to_remove.push(*tx.hash());
121                }
122            }
123            if !to_remove.is_empty() {
124                let removed = pool.remove_transactions(to_remove);
125                metrics.inc_removed_tx_conditional(removed.len());
126            }
127        }
128    }
129}
130
131/// Returns a spawnable future for maintaining the state of the interop tx in the transaction pool.
132pub fn maintain_transaction_pool_interop_future<N, Pool, St>(
133    pool: Pool,
134    events: St,
135    supervisor_client: SupervisorClient,
136) -> BoxFuture<'static, ()>
137where
138    N: NodePrimitives,
139    Pool: TransactionPool + 'static,
140    Pool::Transaction: MaybeInteropTransaction,
141    St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
142{
143    async move {
144        maintain_transaction_pool_interop(pool, events, supervisor_client).await;
145    }
146    .boxed()
147}
148
149/// Maintains the state of the interop tx in the transaction pool by handling new blocks and reorgs.
150///
151/// This listens for any new blocks and reorgs and updates the interop tx in the transaction pool's
152/// state accordingly
153pub async fn maintain_transaction_pool_interop<N, Pool, St>(
154    pool: Pool,
155    mut events: St,
156    supervisor_client: SupervisorClient,
157) where
158    N: NodePrimitives,
159    Pool: TransactionPool,
160    Pool::Transaction: MaybeInteropTransaction,
161    St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
162{
163    let metrics = MaintainPoolInteropMetrics::default();
164
165    loop {
166        let Some(event) = events.next().await else { break };
167        if let CanonStateNotification::Commit { new } = event {
168            let timestamp = new.tip().timestamp();
169            let mut to_remove = Vec::new();
170            let mut to_revalidate = Vec::new();
171            let mut interop_count = 0;
172
173            // scan all pooled interop transactions
174            for pooled_tx in pool.pooled_transactions() {
175                if let Some(interop_deadline_val) = pooled_tx.transaction.interop_deadline() {
176                    interop_count += 1;
177                    if !is_valid_interop(interop_deadline_val, timestamp) {
178                        to_remove.push(*pooled_tx.transaction.hash());
179                    } else if is_stale_interop(interop_deadline_val, timestamp, OFFSET_TIME) {
180                        to_revalidate.push(pooled_tx.transaction.clone());
181                    }
182                }
183            }
184
185            metrics.set_interop_txs_in_pool(interop_count);
186
187            if !to_revalidate.is_empty() {
188                metrics.inc_stale_tx_interop(to_revalidate.len());
189
190                let revalidation_start = Instant::now();
191                let revalidation_stream = supervisor_client.revalidate_interop_txs_stream(
192                    to_revalidate,
193                    timestamp,
194                    TRANSACTION_VALIDITY_WINDOW,
195                    MAX_SUPERVISOR_QUERIES,
196                );
197
198                futures_util::pin_mut!(revalidation_stream);
199
200                while let Some((tx_item_from_stream, validation_result)) =
201                    revalidation_stream.next().await
202                {
203                    match validation_result {
204                        Some(Ok(())) => {
205                            tx_item_from_stream
206                                .set_interop_deadline(timestamp + TRANSACTION_VALIDITY_WINDOW);
207                        }
208                        Some(Err(err)) => {
209                            if err.is_bad_transaction() {
210                                to_remove.push(*tx_item_from_stream.hash());
211                            }
212                        }
213                        None => {
214                            warn!(
215                                target: "txpool",
216                                hash = %tx_item_from_stream.hash(),
217                                "Interop transaction no longer considered cross-chain during revalidation; removing."
218                            );
219                            to_remove.push(*tx_item_from_stream.hash());
220                        }
221                    }
222                }
223
224                metrics.record_supervisor_duration(revalidation_start.elapsed());
225            }
226
227            if !to_remove.is_empty() {
228                let removed = pool.remove_transactions(to_remove);
229                metrics.inc_removed_tx_interop(removed.len());
230            }
231        }
232    }
233}