reth_optimism_txpool/supervisor/
client.rs1use crate::{
4 interop::MaybeInteropTransaction,
5 supervisor::{
6 metrics::SupervisorMetrics, parse_access_list_items_to_inbox_entries, ExecutingDescriptor,
7 InteropTxValidatorError,
8 },
9 InvalidCrossTx,
10};
11use alloy_consensus::Transaction;
12use alloy_eips::eip2930::AccessList;
13use alloy_primitives::{TxHash, B256};
14use alloy_rpc_client::ReqwestClient;
15use futures_util::{
16 future::BoxFuture,
17 stream::{self, StreamExt},
18 Stream,
19};
20use op_alloy_consensus::interop::SafetyLevel;
21use reth_transaction_pool::PoolTransaction;
22use std::{
23 borrow::Cow,
24 future::IntoFuture,
25 sync::Arc,
26 time::{Duration, Instant},
27};
28use tracing::trace;
29
30pub const DEFAULT_SUPERVISOR_URL: &str = "http://localhost:1337/";
33
34pub const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_millis(100);
36
37#[derive(Debug, Clone)]
39pub struct SupervisorClient {
40 inner: Arc<SupervisorClientInner>,
42}
43
44impl SupervisorClient {
45 pub fn builder(supervisor_endpoint: impl Into<String>) -> SupervisorClientBuilder {
47 SupervisorClientBuilder::new(supervisor_endpoint)
48 }
49
50 pub fn timeout(&self) -> Duration {
52 self.inner.timeout
53 }
54
55 pub fn safety(&self) -> SafetyLevel {
57 self.inner.safety
58 }
59
60 pub fn check_access_list<'a>(
62 &self,
63 inbox_entries: &'a [B256],
64 executing_descriptor: ExecutingDescriptor,
65 ) -> CheckAccessListRequest<'a> {
66 CheckAccessListRequest {
67 client: self.inner.client.clone(),
68 inbox_entries: Cow::Borrowed(inbox_entries),
69 executing_descriptor,
70 timeout: self.inner.timeout,
71 safety: self.inner.safety,
72 metrics: self.inner.metrics.clone(),
73 }
74 }
75
76 pub async fn is_valid_cross_tx(
86 &self,
87 access_list: Option<&AccessList>,
88 hash: &TxHash,
89 timestamp: u64,
90 timeout: Option<u64>,
91 is_interop_active: bool,
92 ) -> Option<Result<(), InvalidCrossTx>> {
93 let access_list = access_list?;
96 let inbox_entries = parse_access_list_items_to_inbox_entries(access_list.iter())
97 .copied()
98 .collect::<Vec<_>>();
99 if inbox_entries.is_empty() {
100 return None;
101 }
102
103 if !is_interop_active {
105 return Some(Err(InvalidCrossTx::CrossChainTxPreInterop))
107 }
108
109 if let Err(err) = self
110 .check_access_list(
111 inbox_entries.as_slice(),
112 ExecutingDescriptor::new(timestamp, timeout),
113 )
114 .await
115 {
116 self.inner.metrics.increment_metrics_for_error(&err);
117 trace!(target: "txpool", hash=%hash, err=%err, "Cross chain transaction invalid");
118 return Some(Err(InvalidCrossTx::ValidationError(err)));
119 }
120 Some(Ok(()))
121 }
122
123 pub fn revalidate_interop_txs_stream<'a, TItem, InputIter>(
133 &'a self,
134 txs_to_revalidate: InputIter,
135 current_timestamp: u64,
136 revalidation_window: u64,
137 max_concurrent_queries: usize,
138 ) -> impl Stream<Item = (TItem, Option<Result<(), InvalidCrossTx>>)> + Send + 'a
139 where
140 InputIter: IntoIterator<Item = TItem> + Send + 'a,
141 InputIter::IntoIter: Send + 'a,
142 TItem:
143 MaybeInteropTransaction + PoolTransaction + Transaction + Clone + Send + Sync + 'static,
144 {
145 stream::iter(txs_to_revalidate.into_iter().map(move |tx_item| {
146 let client_for_async_task = self.clone();
147
148 async move {
149 let validation_result = client_for_async_task
150 .is_valid_cross_tx(
151 tx_item.access_list(),
152 tx_item.hash(),
153 current_timestamp,
154 Some(revalidation_window),
155 true,
156 )
157 .await;
158
159 (tx_item, validation_result)
161 }
162 }))
163 .buffered(max_concurrent_queries)
164 }
165}
166
167#[derive(Debug, Clone)]
169pub struct SupervisorClientInner {
170 client: ReqwestClient,
171 safety: SafetyLevel,
173 timeout: Duration,
175 metrics: SupervisorMetrics,
177}
178
179#[derive(Debug)]
181pub struct SupervisorClientBuilder {
182 endpoint: String,
184 timeout: Duration,
189 safety: SafetyLevel,
191}
192
193impl SupervisorClientBuilder {
194 pub fn new(supervisor_endpoint: impl Into<String>) -> Self {
196 Self {
197 endpoint: supervisor_endpoint.into(),
198 timeout: DEFAULT_REQUEST_TIMEOUT,
199 safety: SafetyLevel::CrossUnsafe,
200 }
201 }
202
203 pub const fn timeout(mut self, timeout: Duration) -> Self {
205 self.timeout = timeout;
206 self
207 }
208
209 pub const fn minimum_safety(mut self, min_safety: SafetyLevel) -> Self {
211 self.safety = min_safety;
212 self
213 }
214
215 pub async fn build(self) -> SupervisorClient {
217 let Self { endpoint, timeout, safety } = self;
218
219 let client = ReqwestClient::builder()
220 .connect(endpoint.as_str())
221 .await
222 .expect("building supervisor client");
223
224 SupervisorClient {
225 inner: Arc::new(SupervisorClientInner {
226 client,
227 safety,
228 timeout,
229 metrics: SupervisorMetrics::default(),
230 }),
231 }
232 }
233}
234
235#[derive(Debug, Clone)]
237pub struct CheckAccessListRequest<'a> {
238 client: ReqwestClient,
239 inbox_entries: Cow<'a, [B256]>,
240 executing_descriptor: ExecutingDescriptor,
241 timeout: Duration,
242 safety: SafetyLevel,
243 metrics: SupervisorMetrics,
244}
245
246impl<'a> CheckAccessListRequest<'a> {
247 pub const fn with_timeout(mut self, timeout: Duration) -> Self {
249 self.timeout = timeout;
250 self
251 }
252
253 pub const fn with_safety(mut self, safety: SafetyLevel) -> Self {
255 self.safety = safety;
256 self
257 }
258}
259
260impl<'a> IntoFuture for CheckAccessListRequest<'a> {
261 type Output = Result<(), InteropTxValidatorError>;
262 type IntoFuture = BoxFuture<'a, Self::Output>;
263
264 fn into_future(self) -> Self::IntoFuture {
265 let Self { client, inbox_entries, executing_descriptor, timeout, safety, metrics } = self;
266 Box::pin(async move {
267 let start = Instant::now();
268
269 let result = tokio::time::timeout(
270 timeout,
271 client.request(
272 "supervisor_checkAccessList",
273 (inbox_entries, safety, executing_descriptor),
274 ),
275 )
276 .await;
277 metrics.record_supervisor_query(start.elapsed());
278
279 result
280 .map_err(|_| InteropTxValidatorError::Timeout(timeout.as_secs()))?
281 .map_err(InteropTxValidatorError::from_json_rpc)
282 })
283 }
284}