1use crate::SequencerClientError;
4use alloy_json_rpc::{RpcRecv, RpcSend};
5use alloy_primitives::{hex, B256};
6use alloy_rpc_client::{BuiltInConnectionString, ClientBuilder, RpcClient as Client};
7use alloy_rpc_types_eth::erc4337::TransactionConditional;
8use alloy_transport_http::Http;
9use reth_optimism_txpool::supervisor::metrics::SequencerMetrics;
10use std::{str::FromStr, sync::Arc, time::Instant};
11use thiserror::Error;
12use tracing::warn;
13
14#[derive(Error, Debug)]
16pub enum Error {
17 #[error("Invalid scheme of sequencer url: {0}")]
19 InvalidScheme(String),
20 #[error("Invalid header: {0}")]
22 InvalidHeader(String),
23 #[error("Invalid sequencer url: {0}")]
25 InvalidUrl(String),
26 #[error("Failed to connect to sequencer: {0}")]
28 TransportError(
29 #[from]
30 #[source]
31 alloy_transport::TransportError,
32 ),
33 #[error("Failed to init reqwest client for sequencer: {0}")]
35 ReqwestError(
36 #[from]
37 #[source]
38 reqwest::Error,
39 ),
40}
41
42#[derive(Debug, Clone)]
44pub struct SequencerClient {
45 inner: Arc<SequencerClientInner>,
46}
47
48impl SequencerClientInner {
49 pub(crate) fn new(sequencer_endpoint: String, client: Client) -> Self {
51 let metrics = SequencerMetrics::default();
52 Self { sequencer_endpoint, client, metrics }
53 }
54}
55
56impl SequencerClient {
57 pub async fn new(sequencer_endpoint: impl Into<String>) -> Result<Self, Error> {
61 Self::new_with_headers(sequencer_endpoint, Default::default()).await
62 }
63
64 pub async fn new_with_headers(
68 sequencer_endpoint: impl Into<String>,
69 headers: Vec<String>,
70 ) -> Result<Self, Error> {
71 let sequencer_endpoint = sequencer_endpoint.into();
72 let endpoint = BuiltInConnectionString::from_str(&sequencer_endpoint)?;
73 if let BuiltInConnectionString::Http(url) = endpoint {
74 let mut builder = reqwest::Client::builder()
75 .use_rustls_tls();
77
78 if !headers.is_empty() {
79 let mut header_map = reqwest::header::HeaderMap::new();
80 for header in headers {
81 if let Some((key, value)) = header.split_once('=') {
82 header_map.insert(
83 key.trim()
84 .parse::<reqwest::header::HeaderName>()
85 .map_err(|err| Error::InvalidHeader(err.to_string()))?,
86 value
87 .trim()
88 .parse::<reqwest::header::HeaderValue>()
89 .map_err(|err| Error::InvalidHeader(err.to_string()))?,
90 );
91 }
92 }
93 builder = builder.default_headers(header_map);
94 }
95
96 let client = builder.build()?;
97 Self::with_http_client(url, client)
98 } else {
99 let client = ClientBuilder::default().connect_with(endpoint).await?;
100 let inner = SequencerClientInner::new(sequencer_endpoint, client);
101 Ok(Self { inner: Arc::new(inner) })
102 }
103 }
104
105 pub fn with_http_client(
107 sequencer_endpoint: impl Into<String>,
108 client: reqwest::Client,
109 ) -> Result<Self, Error> {
110 let sequencer_endpoint: String = sequencer_endpoint.into();
111 let url = sequencer_endpoint
112 .parse()
113 .map_err(|_| Error::InvalidUrl(sequencer_endpoint.clone()))?;
114
115 let http_client = Http::with_client(client, url);
116 let is_local = http_client.guess_local();
117 let client = ClientBuilder::default().transport(http_client, is_local);
118
119 let inner = SequencerClientInner::new(sequencer_endpoint, client);
120 Ok(Self { inner: Arc::new(inner) })
121 }
122
123 pub fn endpoint(&self) -> &str {
125 &self.inner.sequencer_endpoint
126 }
127
128 pub fn client(&self) -> &Client {
130 &self.inner.client
131 }
132
133 fn metrics(&self) -> &SequencerMetrics {
135 &self.inner.metrics
136 }
137
138 pub async fn request<Params: RpcSend, Resp: RpcRecv>(
140 &self,
141 method: &str,
142 params: Params,
143 ) -> Result<Resp, SequencerClientError> {
144 let resp =
145 self.client().request::<Params, Resp>(method.to_string(), params).await.inspect_err(
146 |err| {
147 warn!(
148 target: "rpc::sequencer",
149 %err,
150 "HTTP request to sequencer failed",
151 );
152 },
153 )?;
154 Ok(resp)
155 }
156
157 pub async fn forward_raw_transaction(&self, tx: &[u8]) -> Result<B256, SequencerClientError> {
159 let start = Instant::now();
160 let rlp_hex = hex::encode_prefixed(tx);
161 let tx_hash =
162 self.request("eth_sendRawTransaction", (rlp_hex,)).await.inspect_err(|err| {
163 warn!(
164 target: "rpc::eth",
165 %err,
166 "Failed to forward transaction to sequencer",
167 );
168 })?;
169 self.metrics().record_forward_latency(start.elapsed());
170 Ok(tx_hash)
171 }
172
173 pub async fn forward_raw_transaction_conditional(
175 &self,
176 tx: &[u8],
177 condition: TransactionConditional,
178 ) -> Result<B256, SequencerClientError> {
179 let start = Instant::now();
180 let rlp_hex = hex::encode_prefixed(tx);
181 let tx_hash = self
182 .request("eth_sendRawTransactionConditional", (rlp_hex, condition))
183 .await
184 .inspect_err(|err| {
185 warn!(
186 target: "rpc::eth",
187 %err,
188 "Failed to forward transaction conditional for sequencer",
189 );
190 })?;
191 self.metrics().record_forward_latency(start.elapsed());
192 Ok(tx_hash)
193 }
194}
195
196#[derive(Debug)]
197struct SequencerClientInner {
198 sequencer_endpoint: String,
200 client: Client,
202 metrics: SequencerMetrics,
204}
205
206#[cfg(test)]
207mod tests {
208 use super::*;
209 use alloy_primitives::U64;
210
211 #[tokio::test]
212 async fn test_http_body_str() {
213 let client = SequencerClient::new("http://localhost:8545").await.unwrap();
214
215 let request = client
216 .client()
217 .make_request("eth_getBlockByNumber", (U64::from(10),))
218 .serialize()
219 .unwrap()
220 .take_request();
221 let body = request.get();
222
223 assert_eq!(
224 body,
225 r#"{"method":"eth_getBlockByNumber","params":["0xa"],"id":0,"jsonrpc":"2.0"}"#
226 );
227
228 let condition = TransactionConditional::default();
229
230 let request = client
231 .client()
232 .make_request(
233 "eth_sendRawTransactionConditional",
234 (format!("0x{}", hex::encode("abcd")), condition),
235 )
236 .serialize()
237 .unwrap()
238 .take_request();
239 let body = request.get();
240
241 assert_eq!(
242 body,
243 r#"{"method":"eth_sendRawTransactionConditional","params":["0x61626364",{"knownAccounts":{}}],"id":1,"jsonrpc":"2.0"}"#
244 );
245 }
246
247 #[tokio::test]
248 #[ignore = "Start if WS is reachable at ws://localhost:8546"]
249 async fn test_ws_body_str() {
250 let client = SequencerClient::new("ws://localhost:8546").await.unwrap();
251
252 let request = client
253 .client()
254 .make_request("eth_getBlockByNumber", (U64::from(10),))
255 .serialize()
256 .unwrap()
257 .take_request();
258 let body = request.get();
259
260 assert_eq!(
261 body,
262 r#"{"method":"eth_getBlockByNumber","params":["0xa"],"id":0,"jsonrpc":"2.0"}"#
263 );
264
265 let condition = TransactionConditional::default();
266
267 let request = client
268 .client()
269 .make_request(
270 "eth_sendRawTransactionConditional",
271 (format!("0x{}", hex::encode("abcd")), condition),
272 )
273 .serialize()
274 .unwrap()
275 .take_request();
276 let body = request.get();
277
278 assert_eq!(
279 body,
280 r#"{"method":"eth_sendRawTransactionConditional","params":["0x61626364",{"knownAccounts":{}}],"id":1,"jsonrpc":"2.0"}"#
281 );
282 }
283}