reth_optimism_rpc/
sequencer.rs

1//! Helpers for optimism specific RPC implementations.
2
3use crate::SequencerClientError;
4use alloy_json_rpc::{RpcRecv, RpcSend};
5use alloy_primitives::{hex, B256};
6use alloy_rpc_client::{BuiltInConnectionString, ClientBuilder, RpcClient as Client};
7use alloy_rpc_types_eth::erc4337::TransactionConditional;
8use alloy_transport_http::Http;
9use reth_optimism_txpool::supervisor::metrics::SequencerMetrics;
10use std::{str::FromStr, sync::Arc, time::Instant};
11use thiserror::Error;
12use tracing::warn;
13
14/// Sequencer client error
15#[derive(Error, Debug)]
16pub enum Error {
17    /// Invalid scheme
18    #[error("Invalid scheme of sequencer url: {0}")]
19    InvalidScheme(String),
20    /// Invalid header or value provided.
21    #[error("Invalid header: {0}")]
22    InvalidHeader(String),
23    /// Invalid url
24    #[error("Invalid sequencer url: {0}")]
25    InvalidUrl(String),
26    /// Establishing a connection to the sequencer endpoint resulted in an error.
27    #[error("Failed to connect to sequencer: {0}")]
28    TransportError(
29        #[from]
30        #[source]
31        alloy_transport::TransportError,
32    ),
33    /// Reqwest failed to init client
34    #[error("Failed to init reqwest client for sequencer: {0}")]
35    ReqwestError(
36        #[from]
37        #[source]
38        reqwest::Error,
39    ),
40}
41
42/// A client to interact with a Sequencer
43#[derive(Debug, Clone)]
44pub struct SequencerClient {
45    inner: Arc<SequencerClientInner>,
46}
47
48impl SequencerClientInner {
49    /// Creates a new instance with the given endpoint and client.
50    pub(crate) fn new(sequencer_endpoint: String, client: Client) -> Self {
51        let metrics = SequencerMetrics::default();
52        Self { sequencer_endpoint, client, metrics }
53    }
54}
55
56impl SequencerClient {
57    /// Creates a new [`SequencerClient`] for the given URL.
58    ///
59    /// If the URL is a websocket endpoint we connect a websocket instance.
60    pub async fn new(sequencer_endpoint: impl Into<String>) -> Result<Self, Error> {
61        Self::new_with_headers(sequencer_endpoint, Default::default()).await
62    }
63
64    /// Creates a new `SequencerClient` for the given URL with the given headers
65    ///
66    /// This expects headers in the form: `header=value`
67    pub async fn new_with_headers(
68        sequencer_endpoint: impl Into<String>,
69        headers: Vec<String>,
70    ) -> Result<Self, Error> {
71        let sequencer_endpoint = sequencer_endpoint.into();
72        let endpoint = BuiltInConnectionString::from_str(&sequencer_endpoint)?;
73        if let BuiltInConnectionString::Http(url) = endpoint {
74            let mut builder = reqwest::Client::builder()
75                // we force use tls to prevent native issues
76                .use_rustls_tls();
77
78            if !headers.is_empty() {
79                let mut header_map = reqwest::header::HeaderMap::new();
80                for header in headers {
81                    if let Some((key, value)) = header.split_once('=') {
82                        header_map.insert(
83                            key.trim()
84                                .parse::<reqwest::header::HeaderName>()
85                                .map_err(|err| Error::InvalidHeader(err.to_string()))?,
86                            value
87                                .trim()
88                                .parse::<reqwest::header::HeaderValue>()
89                                .map_err(|err| Error::InvalidHeader(err.to_string()))?,
90                        );
91                    }
92                }
93                builder = builder.default_headers(header_map);
94            }
95
96            let client = builder.build()?;
97            Self::with_http_client(url, client)
98        } else {
99            let client = ClientBuilder::default().connect_with(endpoint).await?;
100            let inner = SequencerClientInner::new(sequencer_endpoint, client);
101            Ok(Self { inner: Arc::new(inner) })
102        }
103    }
104
105    /// Creates a new [`SequencerClient`] with http transport with the given http client.
106    pub fn with_http_client(
107        sequencer_endpoint: impl Into<String>,
108        client: reqwest::Client,
109    ) -> Result<Self, Error> {
110        let sequencer_endpoint: String = sequencer_endpoint.into();
111        let url = sequencer_endpoint
112            .parse()
113            .map_err(|_| Error::InvalidUrl(sequencer_endpoint.clone()))?;
114
115        let http_client = Http::with_client(client, url);
116        let is_local = http_client.guess_local();
117        let client = ClientBuilder::default().transport(http_client, is_local);
118
119        let inner = SequencerClientInner::new(sequencer_endpoint, client);
120        Ok(Self { inner: Arc::new(inner) })
121    }
122
123    /// Returns the network of the client
124    pub fn endpoint(&self) -> &str {
125        &self.inner.sequencer_endpoint
126    }
127
128    /// Returns the client
129    pub fn client(&self) -> &Client {
130        &self.inner.client
131    }
132
133    /// Returns a reference to the [`SequencerMetrics`] for tracking client metrics.
134    fn metrics(&self) -> &SequencerMetrics {
135        &self.inner.metrics
136    }
137
138    /// Sends a [`alloy_rpc_client::RpcCall`] request to the sequencer endpoint.
139    pub async fn request<Params: RpcSend, Resp: RpcRecv>(
140        &self,
141        method: &str,
142        params: Params,
143    ) -> Result<Resp, SequencerClientError> {
144        let resp =
145            self.client().request::<Params, Resp>(method.to_string(), params).await.inspect_err(
146                |err| {
147                    warn!(
148                        target: "rpc::sequencer",
149                        %err,
150                        "HTTP request to sequencer failed",
151                    );
152                },
153            )?;
154        Ok(resp)
155    }
156
157    /// Forwards a transaction to the sequencer endpoint.
158    pub async fn forward_raw_transaction(&self, tx: &[u8]) -> Result<B256, SequencerClientError> {
159        let start = Instant::now();
160        let rlp_hex = hex::encode_prefixed(tx);
161        let tx_hash =
162            self.request("eth_sendRawTransaction", (rlp_hex,)).await.inspect_err(|err| {
163                warn!(
164                    target: "rpc::eth",
165                    %err,
166                    "Failed to forward transaction to sequencer",
167                );
168            })?;
169        self.metrics().record_forward_latency(start.elapsed());
170        Ok(tx_hash)
171    }
172
173    /// Forwards a transaction conditional to the sequencer endpoint.
174    pub async fn forward_raw_transaction_conditional(
175        &self,
176        tx: &[u8],
177        condition: TransactionConditional,
178    ) -> Result<B256, SequencerClientError> {
179        let start = Instant::now();
180        let rlp_hex = hex::encode_prefixed(tx);
181        let tx_hash = self
182            .request("eth_sendRawTransactionConditional", (rlp_hex, condition))
183            .await
184            .inspect_err(|err| {
185                warn!(
186                    target: "rpc::eth",
187                    %err,
188                    "Failed to forward transaction conditional for sequencer",
189                );
190            })?;
191        self.metrics().record_forward_latency(start.elapsed());
192        Ok(tx_hash)
193    }
194}
195
196#[derive(Debug)]
197struct SequencerClientInner {
198    /// The endpoint of the sequencer
199    sequencer_endpoint: String,
200    /// The client
201    client: Client,
202    // Metrics for tracking sequencer forwarding
203    metrics: SequencerMetrics,
204}
205
206#[cfg(test)]
207mod tests {
208    use super::*;
209    use alloy_primitives::U64;
210
211    #[tokio::test]
212    async fn test_http_body_str() {
213        let client = SequencerClient::new("http://localhost:8545").await.unwrap();
214
215        let request = client
216            .client()
217            .make_request("eth_getBlockByNumber", (U64::from(10),))
218            .serialize()
219            .unwrap()
220            .take_request();
221        let body = request.get();
222
223        assert_eq!(
224            body,
225            r#"{"method":"eth_getBlockByNumber","params":["0xa"],"id":0,"jsonrpc":"2.0"}"#
226        );
227
228        let condition = TransactionConditional::default();
229
230        let request = client
231            .client()
232            .make_request(
233                "eth_sendRawTransactionConditional",
234                (format!("0x{}", hex::encode("abcd")), condition),
235            )
236            .serialize()
237            .unwrap()
238            .take_request();
239        let body = request.get();
240
241        assert_eq!(
242            body,
243            r#"{"method":"eth_sendRawTransactionConditional","params":["0x61626364",{"knownAccounts":{}}],"id":1,"jsonrpc":"2.0"}"#
244        );
245    }
246
247    #[tokio::test]
248    #[ignore = "Start if WS is reachable at ws://localhost:8546"]
249    async fn test_ws_body_str() {
250        let client = SequencerClient::new("ws://localhost:8546").await.unwrap();
251
252        let request = client
253            .client()
254            .make_request("eth_getBlockByNumber", (U64::from(10),))
255            .serialize()
256            .unwrap()
257            .take_request();
258        let body = request.get();
259
260        assert_eq!(
261            body,
262            r#"{"method":"eth_getBlockByNumber","params":["0xa"],"id":0,"jsonrpc":"2.0"}"#
263        );
264
265        let condition = TransactionConditional::default();
266
267        let request = client
268            .client()
269            .make_request(
270                "eth_sendRawTransactionConditional",
271                (format!("0x{}", hex::encode("abcd")), condition),
272            )
273            .serialize()
274            .unwrap()
275            .take_request();
276        let body = request.get();
277
278        assert_eq!(
279            body,
280            r#"{"method":"eth_sendRawTransactionConditional","params":["0x61626364",{"knownAccounts":{}}],"id":1,"jsonrpc":"2.0"}"#
281        );
282    }
283}