reth_optimism_txpool/supervisor/
client.rs

1//! This is our custom implementation of validator struct
2
3use crate::{
4    interop::MaybeInteropTransaction,
5    supervisor::{
6        metrics::SupervisorMetrics, parse_access_list_items_to_inbox_entries, ExecutingDescriptor,
7        InteropTxValidatorError,
8    },
9    InvalidCrossTx,
10};
11use alloy_consensus::Transaction;
12use alloy_eips::eip2930::AccessList;
13use alloy_primitives::{TxHash, B256};
14use alloy_rpc_client::ReqwestClient;
15use futures_util::{
16    future::BoxFuture,
17    stream::{self, StreamExt},
18    Stream,
19};
20use op_alloy_consensus::interop::SafetyLevel;
21use reth_transaction_pool::PoolTransaction;
22use std::{
23    borrow::Cow,
24    future::IntoFuture,
25    sync::Arc,
26    time::{Duration, Instant},
27};
28use tracing::trace;
29
30/// Supervisor hosted by op-labs
31// TODO: This should be changed to actual supervisor url
32pub const DEFAULT_SUPERVISOR_URL: &str = "http://localhost:1337/";
33
34/// The default request timeout to use
35pub const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_millis(100);
36
37/// Implementation of the supervisor trait for the interop.
38#[derive(Debug, Clone)]
39pub struct SupervisorClient {
40    /// Stores type's data.
41    inner: Arc<SupervisorClientInner>,
42}
43
44impl SupervisorClient {
45    /// Returns a new [`SupervisorClientBuilder`].
46    pub fn builder(supervisor_endpoint: impl Into<String>) -> SupervisorClientBuilder {
47        SupervisorClientBuilder::new(supervisor_endpoint)
48    }
49
50    /// Returns configured timeout. See [`SupervisorClientInner`].
51    pub fn timeout(&self) -> Duration {
52        self.inner.timeout
53    }
54
55    /// Returns configured minimum safety level. See [`SupervisorClient`].
56    pub fn safety(&self) -> SafetyLevel {
57        self.inner.safety
58    }
59
60    /// Executes a `supervisor_checkAccessList` with the configured safety level.
61    pub fn check_access_list<'a>(
62        &self,
63        inbox_entries: &'a [B256],
64        executing_descriptor: ExecutingDescriptor,
65    ) -> CheckAccessListRequest<'a> {
66        CheckAccessListRequest {
67            client: self.inner.client.clone(),
68            inbox_entries: Cow::Borrowed(inbox_entries),
69            executing_descriptor,
70            timeout: self.inner.timeout,
71            safety: self.inner.safety,
72            metrics: self.inner.metrics.clone(),
73        }
74    }
75
76    /// Extracts commitment from access list entries, pointing to 0x420..022 and validates them
77    /// against supervisor.
78    ///
79    /// If commitment present pre-interop tx rejected.
80    ///
81    /// Returns:
82    /// None - if tx is not cross chain,
83    /// Some(Ok(()) - if tx is valid cross chain,
84    /// Some(Err(e)) - if tx is not valid or interop is not active
85    pub async fn is_valid_cross_tx(
86        &self,
87        access_list: Option<&AccessList>,
88        hash: &TxHash,
89        timestamp: u64,
90        timeout: Option<u64>,
91        is_interop_active: bool,
92    ) -> Option<Result<(), InvalidCrossTx>> {
93        // We don't need to check for deposit transaction in here, because they won't come from
94        // txpool
95        let access_list = access_list?;
96        let inbox_entries = parse_access_list_items_to_inbox_entries(access_list.iter())
97            .copied()
98            .collect::<Vec<_>>();
99        if inbox_entries.is_empty() {
100            return None;
101        }
102
103        // Interop check
104        if !is_interop_active {
105            // No cross chain tx allowed before interop
106            return Some(Err(InvalidCrossTx::CrossChainTxPreInterop))
107        }
108
109        if let Err(err) = self
110            .check_access_list(
111                inbox_entries.as_slice(),
112                ExecutingDescriptor::new(timestamp, timeout),
113            )
114            .await
115        {
116            self.inner.metrics.increment_metrics_for_error(&err);
117            trace!(target: "txpool", hash=%hash, err=%err, "Cross chain transaction invalid");
118            return Some(Err(InvalidCrossTx::ValidationError(err)));
119        }
120        Some(Ok(()))
121    }
122
123    /// Creates a stream that revalidates interop transactions against the supervisor.
124    /// Returns
125    /// An implementation of `Stream` that is `Send`-able and tied to the lifetime `'a` of `self`.
126    /// Each item yielded by the stream is a tuple `(TItem, Option<Result<(), InvalidCrossTx>>)`.
127    ///   - The first element is the original `TItem` that was revalidated.
128    ///   - The second element is the `Option<Result<(), InvalidCrossTx>>` describes the outcome
129    ///     - `None`: Transaction was not identified as a cross-chain candidate by initial checks.
130    ///     - `Some(Ok(()))`: Supervisor confirmed the transaction is valid.
131    ///     - `Some(Err(InvalidCrossTx))`: Supervisor indicated the transaction is invalid.
132    pub fn revalidate_interop_txs_stream<'a, TItem, InputIter>(
133        &'a self,
134        txs_to_revalidate: InputIter,
135        current_timestamp: u64,
136        revalidation_window: u64,
137        max_concurrent_queries: usize,
138    ) -> impl Stream<Item = (TItem, Option<Result<(), InvalidCrossTx>>)> + Send + 'a
139    where
140        InputIter: IntoIterator<Item = TItem> + Send + 'a,
141        InputIter::IntoIter: Send + 'a,
142        TItem:
143            MaybeInteropTransaction + PoolTransaction + Transaction + Clone + Send + Sync + 'static,
144    {
145        stream::iter(txs_to_revalidate.into_iter().map(move |tx_item| {
146            let client_for_async_task = self.clone();
147
148            async move {
149                let validation_result = client_for_async_task
150                    .is_valid_cross_tx(
151                        tx_item.access_list(),
152                        tx_item.hash(),
153                        current_timestamp,
154                        Some(revalidation_window),
155                        true,
156                    )
157                    .await;
158
159                // return the original transaction paired with its validation result.
160                (tx_item, validation_result)
161            }
162        }))
163        .buffered(max_concurrent_queries)
164    }
165}
166
167/// Holds supervisor data. Inner type of [`SupervisorClient`].
168#[derive(Debug, Clone)]
169pub struct SupervisorClientInner {
170    client: ReqwestClient,
171    /// The default
172    safety: SafetyLevel,
173    /// The default request timeout
174    timeout: Duration,
175    /// Metrics for tracking supervisor operations
176    metrics: SupervisorMetrics,
177}
178
179/// Builds [`SupervisorClient`].
180#[derive(Debug)]
181pub struct SupervisorClientBuilder {
182    /// Supervisor server's socket.
183    endpoint: String,
184    /// Timeout for requests.
185    ///
186    /// NOTE: this timeout is only effective if it's shorter than the timeout configured for the
187    /// underlying [`ReqwestClient`].
188    timeout: Duration,
189    /// Minimum [`SafetyLevel`] of cross-chain transactions accepted by this client.
190    safety: SafetyLevel,
191}
192
193impl SupervisorClientBuilder {
194    /// Creates a new builder.
195    pub fn new(supervisor_endpoint: impl Into<String>) -> Self {
196        Self {
197            endpoint: supervisor_endpoint.into(),
198            timeout: DEFAULT_REQUEST_TIMEOUT,
199            safety: SafetyLevel::CrossUnsafe,
200        }
201    }
202
203    /// Configures a custom timeout
204    pub const fn timeout(mut self, timeout: Duration) -> Self {
205        self.timeout = timeout;
206        self
207    }
208
209    /// Sets minimum safety level to accept for cross chain transactions.
210    pub const fn minimum_safety(mut self, min_safety: SafetyLevel) -> Self {
211        self.safety = min_safety;
212        self
213    }
214
215    /// Creates a new supervisor validator.
216    pub async fn build(self) -> SupervisorClient {
217        let Self { endpoint, timeout, safety } = self;
218
219        let client = ReqwestClient::builder()
220            .connect(endpoint.as_str())
221            .await
222            .expect("building supervisor client");
223
224        SupervisorClient {
225            inner: Arc::new(SupervisorClientInner {
226                client,
227                safety,
228                timeout,
229                metrics: SupervisorMetrics::default(),
230            }),
231        }
232    }
233}
234
235/// A Request future that issues a `supervisor_checkAccessList` request.
236#[derive(Debug, Clone)]
237pub struct CheckAccessListRequest<'a> {
238    client: ReqwestClient,
239    inbox_entries: Cow<'a, [B256]>,
240    executing_descriptor: ExecutingDescriptor,
241    timeout: Duration,
242    safety: SafetyLevel,
243    metrics: SupervisorMetrics,
244}
245
246impl<'a> CheckAccessListRequest<'a> {
247    /// Configures the timeout to use for the request if any.
248    pub const fn with_timeout(mut self, timeout: Duration) -> Self {
249        self.timeout = timeout;
250        self
251    }
252
253    /// Configures the [`SafetyLevel`] for this request
254    pub const fn with_safety(mut self, safety: SafetyLevel) -> Self {
255        self.safety = safety;
256        self
257    }
258}
259
260impl<'a> IntoFuture for CheckAccessListRequest<'a> {
261    type Output = Result<(), InteropTxValidatorError>;
262    type IntoFuture = BoxFuture<'a, Self::Output>;
263
264    fn into_future(self) -> Self::IntoFuture {
265        let Self { client, inbox_entries, executing_descriptor, timeout, safety, metrics } = self;
266        Box::pin(async move {
267            let start = Instant::now();
268
269            let result = tokio::time::timeout(
270                timeout,
271                client.request(
272                    "supervisor_checkAccessList",
273                    (inbox_entries, safety, executing_descriptor),
274                ),
275            )
276            .await;
277            metrics.record_supervisor_query(start.elapsed());
278
279            result
280                .map_err(|_| InteropTxValidatorError::Timeout(timeout.as_secs()))?
281                .map_err(InteropTxValidatorError::from_json_rpc)
282        })
283    }
284}