reth_optimism_txpool/supervisor/
client.rs

1//! This is our custom implementation of validator struct
2
3use crate::{
4    supervisor::{
5        metrics::SupervisorMetrics, parse_access_list_items_to_inbox_entries, ExecutingDescriptor,
6        InteropTxValidatorError,
7    },
8    InvalidCrossTx,
9};
10use alloy_consensus::Transaction;
11use alloy_eips::eip2930::AccessList;
12use alloy_primitives::{TxHash, B256};
13use alloy_rpc_client::ReqwestClient;
14use futures_util::{
15    future::BoxFuture,
16    stream::{self, StreamExt},
17    Stream,
18};
19use op_alloy_consensus::interop::SafetyLevel;
20use reth_transaction_pool::PoolTransaction;
21use std::{
22    borrow::Cow,
23    future::IntoFuture,
24    sync::Arc,
25    time::{Duration, Instant},
26};
27use tracing::trace;
28
29/// Supervisor hosted by op-labs
30// TODO: This should be changed to actual supervisor url
31pub const DEFAULT_SUPERVISOR_URL: &str = "http://localhost:1337/";
32
33/// The default request timeout to use
34pub const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_millis(100);
35
36/// Implementation of the supervisor trait for the interop.
37#[derive(Debug, Clone)]
38pub struct SupervisorClient {
39    /// Stores type's data.
40    inner: Arc<SupervisorClientInner>,
41}
42
43impl SupervisorClient {
44    /// Returns a new [`SupervisorClientBuilder`].
45    pub fn builder(supervisor_endpoint: impl Into<String>) -> SupervisorClientBuilder {
46        SupervisorClientBuilder::new(supervisor_endpoint)
47    }
48
49    /// Returns configured timeout. See [`SupervisorClientInner`].
50    pub fn timeout(&self) -> Duration {
51        self.inner.timeout
52    }
53
54    /// Returns configured minimum safety level. See [`SupervisorClient`].
55    pub fn safety(&self) -> SafetyLevel {
56        self.inner.safety
57    }
58
59    /// Executes a `supervisor_checkAccessList` with the configured safety level.
60    pub fn check_access_list<'a>(
61        &self,
62        inbox_entries: &'a [B256],
63        executing_descriptor: ExecutingDescriptor,
64    ) -> CheckAccessListRequest<'a> {
65        CheckAccessListRequest {
66            client: self.inner.client.clone(),
67            inbox_entries: Cow::Borrowed(inbox_entries),
68            executing_descriptor,
69            timeout: self.inner.timeout,
70            safety: self.inner.safety,
71            metrics: self.inner.metrics.clone(),
72        }
73    }
74
75    /// Extracts commitment from access list entries, pointing to 0x420..022 and validates them
76    /// against supervisor.
77    ///
78    /// If commitment present pre-interop tx rejected.
79    ///
80    /// Returns:
81    /// None - if tx is not cross chain,
82    /// Some(Ok(()) - if tx is valid cross chain,
83    /// Some(Err(e)) - if tx is not valid or interop is not active
84    pub async fn is_valid_cross_tx(
85        &self,
86        access_list: Option<&AccessList>,
87        hash: &TxHash,
88        timestamp: u64,
89        timeout: Option<u64>,
90        is_interop_active: bool,
91    ) -> Option<Result<(), InvalidCrossTx>> {
92        // We don't need to check for deposit transaction in here, because they won't come from
93        // txpool
94        let access_list = access_list?;
95        let inbox_entries = parse_access_list_items_to_inbox_entries(access_list.iter())
96            .copied()
97            .collect::<Vec<_>>();
98        if inbox_entries.is_empty() {
99            return None;
100        }
101
102        // Interop check
103        if !is_interop_active {
104            // No cross chain tx allowed before interop
105            return Some(Err(InvalidCrossTx::CrossChainTxPreInterop))
106        }
107
108        if let Err(err) = self
109            .check_access_list(
110                inbox_entries.as_slice(),
111                ExecutingDescriptor::new(timestamp, timeout),
112            )
113            .await
114        {
115            self.inner.metrics.increment_metrics_for_error(&err);
116            trace!(target: "txpool", hash=%hash, err=%err, "Cross chain transaction invalid");
117            return Some(Err(InvalidCrossTx::ValidationError(err)));
118        }
119        Some(Ok(()))
120    }
121
122    /// Creates a stream that revalidates interop transactions against the supervisor.
123    /// Returns
124    /// An implementation of `Stream` that is `Send`-able and tied to the lifetime `'a` of `self`.
125    /// Each item yielded by the stream is a tuple `(TItem, Option<Result<(), InvalidCrossTx>>)`.
126    ///   - The first element is the original `TItem` that was revalidated.
127    ///   - The second element is the `Option<Result<(), InvalidCrossTx>>` describes the outcome
128    ///     - `None`: Transaction was not identified as a cross-chain candidate by initial checks.
129    ///     - `Some(Ok(()))`: Supervisor confirmed the transaction is valid.
130    ///     - `Some(Err(InvalidCrossTx))`: Supervisor indicated the transaction is invalid.
131    pub fn revalidate_interop_txs_stream<'a, TItem, InputIter>(
132        &'a self,
133        txs_to_revalidate: InputIter,
134        current_timestamp: u64,
135        revalidation_window: u64,
136        max_concurrent_queries: usize,
137    ) -> impl Stream<Item = (TItem, Option<Result<(), InvalidCrossTx>>)> + Send + 'a
138    where
139        InputIter: IntoIterator<Item = TItem> + Send + 'a,
140        InputIter::IntoIter: Send + 'a,
141        TItem: PoolTransaction + Transaction + Send,
142    {
143        stream::iter(txs_to_revalidate.into_iter().map(move |tx_item| {
144            let client_for_async_task = self.clone();
145
146            async move {
147                let validation_result = client_for_async_task
148                    .is_valid_cross_tx(
149                        tx_item.access_list(),
150                        tx_item.hash(),
151                        current_timestamp,
152                        Some(revalidation_window),
153                        true,
154                    )
155                    .await;
156
157                // return the original transaction paired with its validation result.
158                (tx_item, validation_result)
159            }
160        }))
161        .buffered(max_concurrent_queries)
162    }
163}
164
165/// Holds supervisor data. Inner type of [`SupervisorClient`].
166#[derive(Debug, Clone)]
167pub struct SupervisorClientInner {
168    client: ReqwestClient,
169    /// The default
170    safety: SafetyLevel,
171    /// The default request timeout
172    timeout: Duration,
173    /// Metrics for tracking supervisor operations
174    metrics: SupervisorMetrics,
175}
176
177/// Builds [`SupervisorClient`].
178#[derive(Debug)]
179pub struct SupervisorClientBuilder {
180    /// Supervisor server's socket.
181    endpoint: String,
182    /// Timeout for requests.
183    ///
184    /// NOTE: this timeout is only effective if it's shorter than the timeout configured for the
185    /// underlying [`ReqwestClient`].
186    timeout: Duration,
187    /// Minimum [`SafetyLevel`] of cross-chain transactions accepted by this client.
188    safety: SafetyLevel,
189}
190
191impl SupervisorClientBuilder {
192    /// Creates a new builder.
193    pub fn new(supervisor_endpoint: impl Into<String>) -> Self {
194        Self {
195            endpoint: supervisor_endpoint.into(),
196            timeout: DEFAULT_REQUEST_TIMEOUT,
197            safety: SafetyLevel::CrossUnsafe,
198        }
199    }
200
201    /// Configures a custom timeout
202    pub const fn timeout(mut self, timeout: Duration) -> Self {
203        self.timeout = timeout;
204        self
205    }
206
207    /// Sets minimum safety level to accept for cross chain transactions.
208    pub const fn minimum_safety(mut self, min_safety: SafetyLevel) -> Self {
209        self.safety = min_safety;
210        self
211    }
212
213    /// Creates a new supervisor validator.
214    pub async fn build(self) -> SupervisorClient {
215        let Self { endpoint, timeout, safety } = self;
216
217        let client = ReqwestClient::builder()
218            .connect(endpoint.as_str())
219            .await
220            .expect("building supervisor client");
221
222        SupervisorClient {
223            inner: Arc::new(SupervisorClientInner {
224                client,
225                safety,
226                timeout,
227                metrics: SupervisorMetrics::default(),
228            }),
229        }
230    }
231}
232
233/// A Request future that issues a `supervisor_checkAccessList` request.
234#[derive(Debug, Clone)]
235pub struct CheckAccessListRequest<'a> {
236    client: ReqwestClient,
237    inbox_entries: Cow<'a, [B256]>,
238    executing_descriptor: ExecutingDescriptor,
239    timeout: Duration,
240    safety: SafetyLevel,
241    metrics: SupervisorMetrics,
242}
243
244impl<'a> CheckAccessListRequest<'a> {
245    /// Configures the timeout to use for the request if any.
246    pub const fn with_timeout(mut self, timeout: Duration) -> Self {
247        self.timeout = timeout;
248        self
249    }
250
251    /// Configures the [`SafetyLevel`] for this request
252    pub const fn with_safety(mut self, safety: SafetyLevel) -> Self {
253        self.safety = safety;
254        self
255    }
256}
257
258impl<'a> IntoFuture for CheckAccessListRequest<'a> {
259    type Output = Result<(), InteropTxValidatorError>;
260    type IntoFuture = BoxFuture<'a, Self::Output>;
261
262    fn into_future(self) -> Self::IntoFuture {
263        let Self { client, inbox_entries, executing_descriptor, timeout, safety, metrics } = self;
264        Box::pin(async move {
265            let start = Instant::now();
266
267            let result = tokio::time::timeout(
268                timeout,
269                client.request(
270                    "supervisor_checkAccessList",
271                    (inbox_entries, safety, executing_descriptor),
272                ),
273            )
274            .await;
275            metrics.record_supervisor_query(start.elapsed());
276
277            result
278                .map_err(|_| InteropTxValidatorError::Timeout(timeout.as_secs()))?
279                .map_err(InteropTxValidatorError::from_json_rpc)
280        })
281    }
282}