reth_optimism_txpool/
maintain.rs1const TRANSACTION_VALIDITY_WINDOW: u64 = 600;
5const OFFSET_TIME: u64 = 60;
7const MAX_SUPERVISOR_QUERIES: usize = 10;
9
10use crate::{
11 conditional::MaybeConditionalTransaction,
12 interop::{is_stale_interop, is_valid_interop, MaybeInteropTransaction},
13 supervisor::SupervisorClient,
14};
15use alloy_consensus::{conditional::BlockConditionalAttributes, BlockHeader};
16use futures_util::{future::BoxFuture, FutureExt, Stream, StreamExt};
17use metrics::{Gauge, Histogram};
18use reth_chain_state::CanonStateNotification;
19use reth_metrics::{metrics::Counter, Metrics};
20use reth_primitives_traits::NodePrimitives;
21use reth_transaction_pool::{error::PoolTransactionError, PoolTransaction, TransactionPool};
22use std::time::Instant;
23use tracing::warn;
24
25#[derive(Metrics)]
27#[metrics(scope = "transaction_pool")]
28struct MaintainPoolConditionalMetrics {
29 removed_tx_conditional: Counter,
32}
33
34impl MaintainPoolConditionalMetrics {
35 #[inline]
36 fn inc_removed_tx_conditional(&self, count: usize) {
37 self.removed_tx_conditional.increment(count as u64);
38 }
39}
40
41#[derive(Metrics)]
43#[metrics(scope = "transaction_pool")]
44struct MaintainPoolInteropMetrics {
45 removed_tx_interop: Counter,
48 pooled_interop_transactions: Gauge,
50
51 stale_interop_transactions: Counter,
53 supervisor_revalidation_duration_seconds: Histogram,
56}
57
58impl MaintainPoolInteropMetrics {
59 #[inline]
60 fn inc_removed_tx_interop(&self, count: usize) {
61 self.removed_tx_interop.increment(count as u64);
62 }
63 #[inline]
64 fn set_interop_txs_in_pool(&self, count: usize) {
65 self.pooled_interop_transactions.set(count as f64);
66 }
67
68 #[inline]
69 fn inc_stale_tx_interop(&self, count: usize) {
70 self.stale_interop_transactions.increment(count as u64);
71 }
72
73 #[inline]
75 fn record_supervisor_duration(&self, duration: std::time::Duration) {
76 self.supervisor_revalidation_duration_seconds.record(duration.as_secs_f64());
77 }
78}
79pub fn maintain_transaction_pool_conditional_future<N, Pool, St>(
82 pool: Pool,
83 events: St,
84) -> BoxFuture<'static, ()>
85where
86 N: NodePrimitives,
87 Pool: TransactionPool + 'static,
88 Pool::Transaction: MaybeConditionalTransaction,
89 St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
90{
91 async move {
92 maintain_transaction_pool_conditional(pool, events).await;
93 }
94 .boxed()
95}
96
97pub async fn maintain_transaction_pool_conditional<N, Pool, St>(pool: Pool, mut events: St)
103where
104 N: NodePrimitives,
105 Pool: TransactionPool,
106 Pool::Transaction: MaybeConditionalTransaction,
107 St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
108{
109 let metrics = MaintainPoolConditionalMetrics::default();
110 loop {
111 let Some(event) = events.next().await else { break };
112 if let CanonStateNotification::Commit { new } = event {
113 let block_attr = BlockConditionalAttributes {
114 number: new.tip().number(),
115 timestamp: new.tip().timestamp(),
116 };
117 let mut to_remove = Vec::new();
118 for tx in &pool.pooled_transactions() {
119 if tx.transaction.has_exceeded_block_attributes(&block_attr) {
120 to_remove.push(*tx.hash());
121 }
122 }
123 if !to_remove.is_empty() {
124 let removed = pool.remove_transactions(to_remove);
125 metrics.inc_removed_tx_conditional(removed.len());
126 }
127 }
128 }
129}
130
131pub fn maintain_transaction_pool_interop_future<N, Pool, St>(
133 pool: Pool,
134 events: St,
135 supervisor_client: SupervisorClient,
136) -> BoxFuture<'static, ()>
137where
138 N: NodePrimitives,
139 Pool: TransactionPool + 'static,
140 Pool::Transaction: MaybeInteropTransaction,
141 St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
142{
143 async move {
144 maintain_transaction_pool_interop(pool, events, supervisor_client).await;
145 }
146 .boxed()
147}
148
149pub async fn maintain_transaction_pool_interop<N, Pool, St>(
154 pool: Pool,
155 mut events: St,
156 supervisor_client: SupervisorClient,
157) where
158 N: NodePrimitives,
159 Pool: TransactionPool,
160 Pool::Transaction: MaybeInteropTransaction,
161 St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
162{
163 let metrics = MaintainPoolInteropMetrics::default();
164
165 loop {
166 let Some(event) = events.next().await else { break };
167 if let CanonStateNotification::Commit { new } = event {
168 let timestamp = new.tip().timestamp();
169 let mut to_remove = Vec::new();
170 let mut to_revalidate = Vec::new();
171 let mut interop_count = 0;
172
173 for pooled_tx in pool.pooled_transactions() {
175 if let Some(interop_deadline_val) = pooled_tx.transaction.interop_deadline() {
176 interop_count += 1;
177 if !is_valid_interop(interop_deadline_val, timestamp) {
178 to_remove.push(*pooled_tx.transaction.hash());
179 } else if is_stale_interop(interop_deadline_val, timestamp, OFFSET_TIME) {
180 to_revalidate.push(pooled_tx.transaction.clone());
181 }
182 }
183 }
184
185 metrics.set_interop_txs_in_pool(interop_count);
186
187 if !to_revalidate.is_empty() {
188 metrics.inc_stale_tx_interop(to_revalidate.len());
189
190 let revalidation_start = Instant::now();
191 let revalidation_stream = supervisor_client.revalidate_interop_txs_stream(
192 to_revalidate,
193 timestamp,
194 TRANSACTION_VALIDITY_WINDOW,
195 MAX_SUPERVISOR_QUERIES,
196 );
197
198 futures_util::pin_mut!(revalidation_stream);
199
200 while let Some((tx_item_from_stream, validation_result)) =
201 revalidation_stream.next().await
202 {
203 match validation_result {
204 Some(Ok(())) => {
205 tx_item_from_stream
206 .set_interop_deadline(timestamp + TRANSACTION_VALIDITY_WINDOW);
207 }
208 Some(Err(err)) => {
209 if err.is_bad_transaction() {
210 to_remove.push(*tx_item_from_stream.hash());
211 }
212 }
213 None => {
214 warn!(
215 target: "txpool",
216 hash = %tx_item_from_stream.hash(),
217 "Interop transaction no longer considered cross-chain during revalidation; removing."
218 );
219 to_remove.push(*tx_item_from_stream.hash());
220 }
221 }
222 }
223
224 metrics.record_supervisor_duration(revalidation_start.elapsed());
225 }
226
227 if !to_remove.is_empty() {
228 let removed = pool.remove_transactions(to_remove);
229 metrics.inc_removed_tx_interop(removed.len());
230 }
231 }
232 }
233}