reth_optimism_rpc/
sequencer.rs

1//! Helpers for optimism specific RPC implementations.
2
3use crate::SequencerClientError;
4use alloy_json_rpc::{RpcRecv, RpcSend};
5use alloy_primitives::{hex, B256};
6use alloy_rpc_client::{BuiltInConnectionString, ClientBuilder, RpcClient as Client};
7use alloy_rpc_types_eth::erc4337::TransactionConditional;
8use alloy_transport_http::Http;
9use reth_optimism_txpool::supervisor::metrics::SequencerMetrics;
10use std::{str::FromStr, sync::Arc, time::Instant};
11use thiserror::Error;
12use tracing::warn;
13
14/// Sequencer client error
15#[derive(Error, Debug)]
16pub enum Error {
17    /// Invalid scheme
18    #[error("Invalid scheme of sequencer url: {0}")]
19    InvalidScheme(String),
20    /// Invalid url
21    #[error("Invalid sequencer url: {0}")]
22    InvalidUrl(String),
23    /// Establishing a connection to the sequencer endpoint resulted in an error.
24    #[error("Failed to connect to sequencer: {0}")]
25    TransportError(
26        #[from]
27        #[source]
28        alloy_transport::TransportError,
29    ),
30    /// Reqwest failed to init client
31    #[error("Failed to init reqwest client for sequencer: {0}")]
32    ReqwestError(
33        #[from]
34        #[source]
35        reqwest::Error,
36    ),
37}
38
39/// A client to interact with a Sequencer
40#[derive(Debug, Clone)]
41pub struct SequencerClient {
42    inner: Arc<SequencerClientInner>,
43}
44
45impl SequencerClientInner {
46    /// Creates a new instance with the given endpoint and client.
47    pub(crate) fn new(sequencer_endpoint: String, client: Client) -> Self {
48        let metrics = SequencerMetrics::default();
49        Self { sequencer_endpoint, client, metrics }
50    }
51}
52
53impl SequencerClient {
54    /// Creates a new [`SequencerClient`] for the given URL.
55    ///
56    /// If the URL is a websocket endpoint we connect a websocket instance.
57    pub async fn new(sequencer_endpoint: impl Into<String>) -> Result<Self, Error> {
58        let sequencer_endpoint = sequencer_endpoint.into();
59        let endpoint = BuiltInConnectionString::from_str(&sequencer_endpoint)?;
60        if let BuiltInConnectionString::Http(url) = endpoint {
61            let client = reqwest::Client::builder()
62                // we force use tls to prevent native issues
63                .use_rustls_tls()
64                .build()?;
65            Self::with_http_client(url, client)
66        } else {
67            let client = ClientBuilder::default().connect_with(endpoint).await?;
68            let inner = SequencerClientInner::new(sequencer_endpoint, client);
69            Ok(Self { inner: Arc::new(inner) })
70        }
71    }
72
73    /// Creates a new [`SequencerClient`] with http transport with the given http client.
74    pub fn with_http_client(
75        sequencer_endpoint: impl Into<String>,
76        client: reqwest::Client,
77    ) -> Result<Self, Error> {
78        let sequencer_endpoint: String = sequencer_endpoint.into();
79        let url = sequencer_endpoint
80            .parse()
81            .map_err(|_| Error::InvalidUrl(sequencer_endpoint.clone()))?;
82
83        let http_client = Http::with_client(client, url);
84        let is_local = http_client.guess_local();
85        let client = ClientBuilder::default().transport(http_client, is_local);
86
87        let inner = SequencerClientInner::new(sequencer_endpoint, client);
88        Ok(Self { inner: Arc::new(inner) })
89    }
90
91    /// Returns the network of the client
92    pub fn endpoint(&self) -> &str {
93        &self.inner.sequencer_endpoint
94    }
95
96    /// Returns the client
97    pub fn client(&self) -> &Client {
98        &self.inner.client
99    }
100
101    /// Returns a reference to the [`SequencerMetrics`] for tracking client metrics.
102    fn metrics(&self) -> &SequencerMetrics {
103        &self.inner.metrics
104    }
105
106    /// Sends a [`alloy_rpc_client::RpcCall`] request to the sequencer endpoint.
107    pub async fn request<Params: RpcSend, Resp: RpcRecv>(
108        &self,
109        method: &str,
110        params: Params,
111    ) -> Result<Resp, SequencerClientError> {
112        let resp =
113            self.client().request::<Params, Resp>(method.to_string(), params).await.inspect_err(
114                |err| {
115                    warn!(
116                        target: "rpc::sequencer",
117                        %err,
118                        "HTTP request to sequencer failed",
119                    );
120                },
121            )?;
122        Ok(resp)
123    }
124
125    /// Forwards a transaction to the sequencer endpoint.
126    pub async fn forward_raw_transaction(&self, tx: &[u8]) -> Result<B256, SequencerClientError> {
127        let start = Instant::now();
128        let rlp_hex = hex::encode_prefixed(tx);
129        let tx_hash =
130            self.request("eth_sendRawTransaction", (rlp_hex,)).await.inspect_err(|err| {
131                warn!(
132                    target: "rpc::eth",
133                    %err,
134                    "Failed to forward transaction to sequencer",
135                );
136            })?;
137        self.metrics().record_forward_latency(start.elapsed());
138        Ok(tx_hash)
139    }
140
141    /// Forwards a transaction conditional to the sequencer endpoint.
142    pub async fn forward_raw_transaction_conditional(
143        &self,
144        tx: &[u8],
145        condition: TransactionConditional,
146    ) -> Result<B256, SequencerClientError> {
147        let start = Instant::now();
148        let rlp_hex = hex::encode_prefixed(tx);
149        let tx_hash = self
150            .request("eth_sendRawTransactionConditional", (rlp_hex, condition))
151            .await
152            .inspect_err(|err| {
153                warn!(
154                    target: "rpc::eth",
155                    %err,
156                    "Failed to forward transaction conditional for sequencer",
157                );
158            })?;
159        self.metrics().record_forward_latency(start.elapsed());
160        Ok(tx_hash)
161    }
162}
163
164#[derive(Debug)]
165struct SequencerClientInner {
166    /// The endpoint of the sequencer
167    sequencer_endpoint: String,
168    /// The client
169    client: Client,
170    // Metrics for tracking sequencer forwarding
171    metrics: SequencerMetrics,
172}
173
174#[cfg(test)]
175mod tests {
176    use super::*;
177    use alloy_primitives::U64;
178
179    #[tokio::test]
180    async fn test_http_body_str() {
181        let client = SequencerClient::new("http://localhost:8545").await.unwrap();
182
183        let request = client
184            .client()
185            .make_request("eth_getBlockByNumber", (U64::from(10),))
186            .serialize()
187            .unwrap()
188            .take_request();
189        let body = request.get();
190
191        assert_eq!(
192            body,
193            r#"{"method":"eth_getBlockByNumber","params":["0xa"],"id":0,"jsonrpc":"2.0"}"#
194        );
195
196        let condition = TransactionConditional::default();
197
198        let request = client
199            .client()
200            .make_request(
201                "eth_sendRawTransactionConditional",
202                (format!("0x{}", hex::encode("abcd")), condition),
203            )
204            .serialize()
205            .unwrap()
206            .take_request();
207        let body = request.get();
208
209        assert_eq!(
210            body,
211            r#"{"method":"eth_sendRawTransactionConditional","params":["0x61626364",{"knownAccounts":{}}],"id":1,"jsonrpc":"2.0"}"#
212        );
213    }
214
215    #[tokio::test]
216    #[ignore = "Start if WS is reachable at ws://localhost:8546"]
217    async fn test_ws_body_str() {
218        let client = SequencerClient::new("ws://localhost:8546").await.unwrap();
219
220        let request = client
221            .client()
222            .make_request("eth_getBlockByNumber", (U64::from(10),))
223            .serialize()
224            .unwrap()
225            .take_request();
226        let body = request.get();
227
228        assert_eq!(
229            body,
230            r#"{"method":"eth_getBlockByNumber","params":["0xa"],"id":0,"jsonrpc":"2.0"}"#
231        );
232
233        let condition = TransactionConditional::default();
234
235        let request = client
236            .client()
237            .make_request(
238                "eth_sendRawTransactionConditional",
239                (format!("0x{}", hex::encode("abcd")), condition),
240            )
241            .serialize()
242            .unwrap()
243            .take_request();
244        let body = request.get();
245
246        assert_eq!(
247            body,
248            r#"{"method":"eth_sendRawTransactionConditional","params":["0x61626364",{"knownAccounts":{}}],"id":1,"jsonrpc":"2.0"}"#
249        );
250    }
251}