1//! Support for maintaining the state of the transaction pool
23/// The interval for which we check transaction against supervisor, 10 min.
4const TRANSACTION_VALIDITY_WINDOW: u64 = 600;
5/// Interval in seconds at which the transaction should be revalidated.
6const OFFSET_TIME: u64 = 60;
7/// Maximum number of supervisor requests at the same time
8const MAX_SUPERVISOR_QUERIES: usize = 10;
910use crate::{
11conditional::MaybeConditionalTransaction,
12 interop::{is_stale_interop, is_valid_interop, MaybeInteropTransaction},
13supervisor::SupervisorClient,
14};
15use alloy_consensus::{conditional::BlockConditionalAttributes, BlockHeader, Transaction};
16use futures_util::{future::BoxFuture, FutureExt, Stream, StreamExt};
17use metrics::Gauge;
18use reth_chain_state::CanonStateNotification;
19use reth_metrics::{metrics::Counter, Metrics};
20use reth_primitives_traits::NodePrimitives;
21use reth_transaction_pool::{error::PoolTransactionError, PoolTransaction, TransactionPool};
22use std::sync::Arc;
2324/// Transaction pool maintenance metrics
25#[derive(Metrics)]
26#[metrics(scope = "transaction_pool")]
27struct MaintainPoolConditionalMetrics {
28/// Counter indicating the number of conditional transactions removed from
29 /// the pool because of exceeded block attributes.
30removed_tx_conditional: Counter,
31}
3233impl MaintainPoolConditionalMetrics {
34#[inline]
35fn inc_removed_tx_conditional(&self, count: usize) {
36self.removed_tx_conditional.increment(countas u64);
37 }
38}
3940/// Transaction pool maintenance metrics
41#[derive(Metrics)]
42#[metrics(scope = "transaction_pool")]
43struct MaintainPoolInteropMetrics {
44/// Counter indicating the number of conditional transactions removed from
45 /// the pool because of exceeded block attributes.
46removed_tx_interop: Counter,
47/// Number of interop transactions currently in the pool
48pooled_interop_transactions: Gauge,
4950/// Counter for interop transactions that became stale and need revalidation
51stale_interop_transactions: Counter,
52// TODO: we also should add metric for (hash, counter) to check number of validation per tx
53 // TODO: we should add some timing metric in here to check supervisor congestion
54}
5556impl MaintainPoolInteropMetrics {
57#[inline]
58fn inc_removed_tx_interop(&self, count: usize) {
59self.removed_tx_interop.increment(countas u64);
60 }
61#[inline]
62fn set_interop_txs_in_pool(&self, count: usize) {
63self.pooled_interop_transactions.set(countas f64);
64 }
6566#[inline]
67fn inc_stale_tx_interop(&self, count: usize) {
68self.stale_interop_transactions.increment(countas u64);
69 }
70}
71/// Returns a spawnable future for maintaining the state of the conditional txs in the transaction
72/// pool.
73pub fn maintain_transaction_pool_conditional_future<N, Pool, St>(
74 pool: Pool,
75 events: St,
76) -> BoxFuture<'static, ()>
77where
78N: NodePrimitives,
79 Pool: TransactionPool + 'static,
80 Pool::Transaction: MaybeConditionalTransaction,
81 St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
82{
83async move {
84maintain_transaction_pool_conditional(pool, events).await;
85 }
86 .boxed()
87}
8889/// Maintains the state of the conditional tx in the transaction pool by handling new blocks and
90/// reorgs.
91///
92/// This listens for any new blocks and reorgs and updates the conditional txs in the
93/// transaction pool's state accordingly
94pub async fn maintain_transaction_pool_conditional<N, Pool, St>(pool: Pool, mut events: St)
95where
96N: NodePrimitives,
97 Pool: TransactionPool,
98 Pool::Transaction: MaybeConditionalTransaction,
99 St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
100{
101let metrics = MaintainPoolConditionalMetrics::default();
102loop {
103let Some(event) = events.next().await else { break };
104if let CanonStateNotification::Commit { new } = event {
105let block_attr = BlockConditionalAttributes {
106 number: new.tip().number(),
107 timestamp: new.tip().timestamp(),
108 };
109let mut to_remove = Vec::new();
110for tx in &pool.pooled_transactions() {
111if tx.transaction.has_exceeded_block_attributes(&block_attr) {
112 to_remove.push(*tx.hash());
113 }
114 }
115if !to_remove.is_empty() {
116let removed = pool.remove_transactions(to_remove);
117 metrics.inc_removed_tx_conditional(removed.len());
118 }
119 }
120 }
121}
122123/// Returns a spawnable future for maintaining the state of the interop tx in the transaction pool.
124pub fn maintain_transaction_pool_interop_future<N, Pool, St>(
125 pool: Pool,
126 events: St,
127 supervisor_client: SupervisorClient,
128) -> BoxFuture<'static, ()>
129where
130N: NodePrimitives,
131 Pool: TransactionPool + 'static,
132 Pool::Transaction: MaybeInteropTransaction,
133 St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
134{
135async move {
136maintain_transaction_pool_interop(pool, events, supervisor_client).await;
137 }
138 .boxed()
139}
140141/// Maintains the state of the interop tx in the transaction pool by handling new blocks and reorgs.
142///
143/// This listens for any new blocks and reorgs and updates the interop tx in the transaction pool's
144/// state accordingly
145pub async fn maintain_transaction_pool_interop<N, Pool, St>(
146 pool: Pool,
147mut events: St,
148 supervisor_client: SupervisorClient,
149) where
150N: NodePrimitives,
151 Pool: TransactionPool,
152 Pool::Transaction: MaybeInteropTransaction,
153 St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
154{
155let metrics = MaintainPoolInteropMetrics::default();
156let supervisor_client = Arc::new(supervisor_client);
157loop {
158let Some(event) = events.next().await else { break };
159if let CanonStateNotification::Commit { new } = event {
160let timestamp = new.tip().timestamp();
161let mut to_remove = Vec::new();
162let mut to_revalidate = Vec::new();
163let mut interop_count = 0;
164for tx in &pool.pooled_transactions() {
165// Only interop txs have this field set
166if let Some(interop) = tx.transaction.interop_deadline() {
167 interop_count += 1;
168if !is_valid_interop(interop, timestamp) {
169// That means tx didn't revalidated during [`OFFSET_TIME`] time
170 // We could assume that it won't be validated at all and remove it
171to_remove.push(*tx.hash());
172 } else if is_stale_interop(interop, timestamp, OFFSET_TIME) {
173// If tx has less then [`OFFSET_TIME`] of valid time we revalidate it
174to_revalidate.push(tx.clone())
175 }
176 }
177 }
178179 metrics.set_interop_txs_in_pool(interop_count);
180if !to_revalidate.is_empty() {
181 metrics.inc_stale_tx_interop(to_revalidate.len());
182let checks_stream =
183 futures_util::stream::iter(to_revalidate.into_iter().map(|tx| {
184let supervisor_client = supervisor_client.clone();
185async move {
186let check = supervisor_client
187 .is_valid_cross_tx(
188 tx.transaction.access_list(),
189 tx.transaction.hash(),
190 timestamp,
191Some(TRANSACTION_VALIDITY_WINDOW),
192// We could assume that interop is enabled, because
193 // tx.transaction.interop() would be set only in
194 // this case
195true,
196 )
197 .await;
198 (tx.clone(), check)
199 }
200 }))
201 .buffered(MAX_SUPERVISOR_QUERIES);
202futures_util::pin_mut!(checks_stream);
203while let Some((tx, check)) = checks_stream.next().await {
204if let Some(Err(err)) = check {
205// We remove only bad transaction. If error caused by supervisor instability
206 // or other fixable issues transaction would be validated on next state
207 // change, so we ignore it
208if err.is_bad_transaction() {
209 to_remove.push(*tx.transaction.hash());
210 }
211 } else {
212 tx.transaction.set_interop_deadline(timestamp + TRANSACTION_VALIDITY_WINDOW)
213 }
214 }
215 }
216if !to_remove.is_empty() {
217let removed = pool.remove_transactions(to_remove);
218 metrics.inc_removed_tx_interop(removed.len());
219 }
220 }
221 }
222}