reth_optimism_txpool/supervisor/
client.rs1use crate::{
4 supervisor::{
5 metrics::SupervisorMetrics, parse_access_list_items_to_inbox_entries, ExecutingDescriptor,
6 InteropTxValidatorError,
7 },
8 InvalidCrossTx,
9};
10use alloy_consensus::Transaction;
11use alloy_eips::eip2930::AccessList;
12use alloy_primitives::{TxHash, B256};
13use alloy_rpc_client::ReqwestClient;
14use futures_util::{
15 future::BoxFuture,
16 stream::{self, StreamExt},
17 Stream,
18};
19use op_alloy_consensus::interop::SafetyLevel;
20use reth_transaction_pool::PoolTransaction;
21use std::{
22 borrow::Cow,
23 future::IntoFuture,
24 sync::Arc,
25 time::{Duration, Instant},
26};
27use tracing::trace;
28
29pub const DEFAULT_SUPERVISOR_URL: &str = "http://localhost:1337/";
32
33pub const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_millis(100);
35
36#[derive(Debug, Clone)]
38pub struct SupervisorClient {
39 inner: Arc<SupervisorClientInner>,
41}
42
43impl SupervisorClient {
44 pub fn builder(supervisor_endpoint: impl Into<String>) -> SupervisorClientBuilder {
46 SupervisorClientBuilder::new(supervisor_endpoint)
47 }
48
49 pub fn timeout(&self) -> Duration {
51 self.inner.timeout
52 }
53
54 pub fn safety(&self) -> SafetyLevel {
56 self.inner.safety
57 }
58
59 pub fn check_access_list<'a>(
61 &self,
62 inbox_entries: &'a [B256],
63 executing_descriptor: ExecutingDescriptor,
64 ) -> CheckAccessListRequest<'a> {
65 CheckAccessListRequest {
66 client: self.inner.client.clone(),
67 inbox_entries: Cow::Borrowed(inbox_entries),
68 executing_descriptor,
69 timeout: self.inner.timeout,
70 safety: self.inner.safety,
71 metrics: self.inner.metrics.clone(),
72 }
73 }
74
75 pub async fn is_valid_cross_tx(
85 &self,
86 access_list: Option<&AccessList>,
87 hash: &TxHash,
88 timestamp: u64,
89 timeout: Option<u64>,
90 is_interop_active: bool,
91 ) -> Option<Result<(), InvalidCrossTx>> {
92 let access_list = access_list?;
95 let inbox_entries = parse_access_list_items_to_inbox_entries(access_list.iter())
96 .copied()
97 .collect::<Vec<_>>();
98 if inbox_entries.is_empty() {
99 return None;
100 }
101
102 if !is_interop_active {
104 return Some(Err(InvalidCrossTx::CrossChainTxPreInterop))
106 }
107
108 if let Err(err) = self
109 .check_access_list(
110 inbox_entries.as_slice(),
111 ExecutingDescriptor::new(timestamp, timeout),
112 )
113 .await
114 {
115 self.inner.metrics.increment_metrics_for_error(&err);
116 trace!(target: "txpool", hash=%hash, err=%err, "Cross chain transaction invalid");
117 return Some(Err(InvalidCrossTx::ValidationError(err)));
118 }
119 Some(Ok(()))
120 }
121
122 pub fn revalidate_interop_txs_stream<'a, TItem, InputIter>(
132 &'a self,
133 txs_to_revalidate: InputIter,
134 current_timestamp: u64,
135 revalidation_window: u64,
136 max_concurrent_queries: usize,
137 ) -> impl Stream<Item = (TItem, Option<Result<(), InvalidCrossTx>>)> + Send + 'a
138 where
139 InputIter: IntoIterator<Item = TItem> + Send + 'a,
140 InputIter::IntoIter: Send + 'a,
141 TItem: PoolTransaction + Transaction + Send,
142 {
143 stream::iter(txs_to_revalidate.into_iter().map(move |tx_item| {
144 let client_for_async_task = self.clone();
145
146 async move {
147 let validation_result = client_for_async_task
148 .is_valid_cross_tx(
149 tx_item.access_list(),
150 tx_item.hash(),
151 current_timestamp,
152 Some(revalidation_window),
153 true,
154 )
155 .await;
156
157 (tx_item, validation_result)
159 }
160 }))
161 .buffered(max_concurrent_queries)
162 }
163}
164
165#[derive(Debug, Clone)]
167pub struct SupervisorClientInner {
168 client: ReqwestClient,
169 safety: SafetyLevel,
171 timeout: Duration,
173 metrics: SupervisorMetrics,
175}
176
177#[derive(Debug)]
179pub struct SupervisorClientBuilder {
180 endpoint: String,
182 timeout: Duration,
187 safety: SafetyLevel,
189}
190
191impl SupervisorClientBuilder {
192 pub fn new(supervisor_endpoint: impl Into<String>) -> Self {
194 Self {
195 endpoint: supervisor_endpoint.into(),
196 timeout: DEFAULT_REQUEST_TIMEOUT,
197 safety: SafetyLevel::CrossUnsafe,
198 }
199 }
200
201 pub const fn timeout(mut self, timeout: Duration) -> Self {
203 self.timeout = timeout;
204 self
205 }
206
207 pub const fn minimum_safety(mut self, min_safety: SafetyLevel) -> Self {
209 self.safety = min_safety;
210 self
211 }
212
213 pub async fn build(self) -> SupervisorClient {
215 let Self { endpoint, timeout, safety } = self;
216
217 let client = ReqwestClient::builder()
218 .connect(endpoint.as_str())
219 .await
220 .expect("building supervisor client");
221
222 SupervisorClient {
223 inner: Arc::new(SupervisorClientInner {
224 client,
225 safety,
226 timeout,
227 metrics: SupervisorMetrics::default(),
228 }),
229 }
230 }
231}
232
233#[derive(Debug, Clone)]
235pub struct CheckAccessListRequest<'a> {
236 client: ReqwestClient,
237 inbox_entries: Cow<'a, [B256]>,
238 executing_descriptor: ExecutingDescriptor,
239 timeout: Duration,
240 safety: SafetyLevel,
241 metrics: SupervisorMetrics,
242}
243
244impl<'a> CheckAccessListRequest<'a> {
245 pub const fn with_timeout(mut self, timeout: Duration) -> Self {
247 self.timeout = timeout;
248 self
249 }
250
251 pub const fn with_safety(mut self, safety: SafetyLevel) -> Self {
253 self.safety = safety;
254 self
255 }
256}
257
258impl<'a> IntoFuture for CheckAccessListRequest<'a> {
259 type Output = Result<(), InteropTxValidatorError>;
260 type IntoFuture = BoxFuture<'a, Self::Output>;
261
262 fn into_future(self) -> Self::IntoFuture {
263 let Self { client, inbox_entries, executing_descriptor, timeout, safety, metrics } = self;
264 Box::pin(async move {
265 let start = Instant::now();
266
267 let result = tokio::time::timeout(
268 timeout,
269 client.request(
270 "supervisor_checkAccessList",
271 (inbox_entries, safety, executing_descriptor),
272 ),
273 )
274 .await;
275 metrics.record_supervisor_query(start.elapsed());
276
277 result
278 .map_err(|_| InteropTxValidatorError::Timeout(timeout.as_secs()))?
279 .map_err(InteropTxValidatorError::from_json_rpc)
280 })
281 }
282}