1use crate::{SequencerClientError, SequencerMetrics};
4use alloy_json_rpc::{RpcRecv, RpcSend};
5use alloy_primitives::{hex, B256};
6use alloy_rpc_client::{BuiltInConnectionString, ClientBuilder, RpcClient as Client};
7use alloy_rpc_types_eth::erc4337::TransactionConditional;
8use alloy_transport_http::Http;
9use std::{str::FromStr, sync::Arc, time::Instant};
10use thiserror::Error;
11use tracing::warn;
12
13#[derive(Error, Debug)]
15pub enum Error {
16 #[error("Invalid scheme of sequencer url: {0}")]
18 InvalidScheme(String),
19 #[error("Invalid header: {0}")]
21 InvalidHeader(String),
22 #[error("Invalid sequencer url: {0}")]
24 InvalidUrl(String),
25 #[error("Failed to connect to sequencer: {0}")]
27 TransportError(
28 #[from]
29 #[source]
30 alloy_transport::TransportError,
31 ),
32 #[error("Failed to init reqwest client for sequencer: {0}")]
34 ReqwestError(
35 #[from]
36 #[source]
37 reqwest::Error,
38 ),
39}
40
41#[derive(Debug, Clone)]
43pub struct SequencerClient {
44 inner: Arc<SequencerClientInner>,
45}
46
47impl SequencerClientInner {
48 pub(crate) fn new(sequencer_endpoint: String, client: Client) -> Self {
50 let metrics = SequencerMetrics::default();
51 Self { sequencer_endpoint, client, metrics }
52 }
53}
54
55impl SequencerClient {
56 pub async fn new(sequencer_endpoint: impl Into<String>) -> Result<Self, Error> {
60 Self::new_with_headers(sequencer_endpoint, Default::default()).await
61 }
62
63 pub async fn new_with_headers(
67 sequencer_endpoint: impl Into<String>,
68 headers: Vec<String>,
69 ) -> Result<Self, Error> {
70 let sequencer_endpoint = sequencer_endpoint.into();
71 let endpoint = BuiltInConnectionString::from_str(&sequencer_endpoint)?;
72 if let BuiltInConnectionString::Http(url) = endpoint {
73 let mut builder = reqwest::Client::builder()
74 .use_rustls_tls();
76
77 if !headers.is_empty() {
78 let mut header_map = reqwest::header::HeaderMap::new();
79 for header in headers {
80 if let Some((key, value)) = header.split_once('=') {
81 header_map.insert(
82 key.trim()
83 .parse::<reqwest::header::HeaderName>()
84 .map_err(|err| Error::InvalidHeader(err.to_string()))?,
85 value
86 .trim()
87 .parse::<reqwest::header::HeaderValue>()
88 .map_err(|err| Error::InvalidHeader(err.to_string()))?,
89 );
90 }
91 }
92 builder = builder.default_headers(header_map);
93 }
94
95 let client = builder.build()?;
96 Self::with_http_client(url, client)
97 } else {
98 let client = ClientBuilder::default().connect_with(endpoint).await?;
99 let inner = SequencerClientInner::new(sequencer_endpoint, client);
100 Ok(Self { inner: Arc::new(inner) })
101 }
102 }
103
104 pub fn with_http_client(
106 sequencer_endpoint: impl Into<String>,
107 client: reqwest::Client,
108 ) -> Result<Self, Error> {
109 let sequencer_endpoint: String = sequencer_endpoint.into();
110 let url = sequencer_endpoint
111 .parse()
112 .map_err(|_| Error::InvalidUrl(sequencer_endpoint.clone()))?;
113
114 let http_client = Http::with_client(client, url);
115 let is_local = http_client.guess_local();
116 let client = ClientBuilder::default().transport(http_client, is_local);
117
118 let inner = SequencerClientInner::new(sequencer_endpoint, client);
119 Ok(Self { inner: Arc::new(inner) })
120 }
121
122 pub fn endpoint(&self) -> &str {
124 &self.inner.sequencer_endpoint
125 }
126
127 pub fn client(&self) -> &Client {
129 &self.inner.client
130 }
131
132 fn metrics(&self) -> &SequencerMetrics {
134 &self.inner.metrics
135 }
136
137 pub async fn request<Params: RpcSend, Resp: RpcRecv>(
139 &self,
140 method: &str,
141 params: Params,
142 ) -> Result<Resp, SequencerClientError> {
143 let resp =
144 self.client().request::<Params, Resp>(method.to_string(), params).await.inspect_err(
145 |err| {
146 warn!(
147 target: "rpc::sequencer",
148 %err,
149 "HTTP request to sequencer failed",
150 );
151 },
152 )?;
153 Ok(resp)
154 }
155
156 pub async fn forward_raw_transaction(&self, tx: &[u8]) -> Result<B256, SequencerClientError> {
158 let start = Instant::now();
159 let rlp_hex = hex::encode_prefixed(tx);
160 let tx_hash =
161 self.request("eth_sendRawTransaction", (rlp_hex,)).await.inspect_err(|err| {
162 warn!(
163 target: "rpc::eth",
164 %err,
165 "Failed to forward transaction to sequencer",
166 );
167 })?;
168 self.metrics().record_forward_latency(start.elapsed());
169 Ok(tx_hash)
170 }
171
172 pub async fn forward_raw_transaction_conditional(
174 &self,
175 tx: &[u8],
176 condition: TransactionConditional,
177 ) -> Result<B256, SequencerClientError> {
178 let start = Instant::now();
179 let rlp_hex = hex::encode_prefixed(tx);
180 let tx_hash = self
181 .request("eth_sendRawTransactionConditional", (rlp_hex, condition))
182 .await
183 .inspect_err(|err| {
184 warn!(
185 target: "rpc::eth",
186 %err,
187 "Failed to forward transaction conditional for sequencer",
188 );
189 })?;
190 self.metrics().record_forward_latency(start.elapsed());
191 Ok(tx_hash)
192 }
193}
194
195#[derive(Debug)]
196struct SequencerClientInner {
197 sequencer_endpoint: String,
199 client: Client,
201 metrics: SequencerMetrics,
203}
204
205#[cfg(test)]
206mod tests {
207 use super::*;
208 use alloy_primitives::U64;
209
210 #[tokio::test]
211 async fn test_http_body_str() {
212 let client = SequencerClient::new("http://localhost:8545").await.unwrap();
213
214 let request = client
215 .client()
216 .make_request("eth_getBlockByNumber", (U64::from(10),))
217 .serialize()
218 .unwrap()
219 .take_request();
220 let body = request.get();
221
222 assert_eq!(
223 body,
224 r#"{"method":"eth_getBlockByNumber","params":["0xa"],"id":0,"jsonrpc":"2.0"}"#
225 );
226
227 let condition = TransactionConditional::default();
228
229 let request = client
230 .client()
231 .make_request(
232 "eth_sendRawTransactionConditional",
233 (format!("0x{}", hex::encode("abcd")), condition),
234 )
235 .serialize()
236 .unwrap()
237 .take_request();
238 let body = request.get();
239
240 assert_eq!(
241 body,
242 r#"{"method":"eth_sendRawTransactionConditional","params":["0x61626364",{"knownAccounts":{}}],"id":1,"jsonrpc":"2.0"}"#
243 );
244 }
245
246 #[tokio::test]
247 #[ignore = "Start if WS is reachable at ws://localhost:8546"]
248 async fn test_ws_body_str() {
249 let client = SequencerClient::new("ws://localhost:8546").await.unwrap();
250
251 let request = client
252 .client()
253 .make_request("eth_getBlockByNumber", (U64::from(10),))
254 .serialize()
255 .unwrap()
256 .take_request();
257 let body = request.get();
258
259 assert_eq!(
260 body,
261 r#"{"method":"eth_getBlockByNumber","params":["0xa"],"id":0,"jsonrpc":"2.0"}"#
262 );
263
264 let condition = TransactionConditional::default();
265
266 let request = client
267 .client()
268 .make_request(
269 "eth_sendRawTransactionConditional",
270 (format!("0x{}", hex::encode("abcd")), condition),
271 )
272 .serialize()
273 .unwrap()
274 .take_request();
275 let body = request.get();
276
277 assert_eq!(
278 body,
279 r#"{"method":"eth_sendRawTransactionConditional","params":["0x61626364",{"knownAccounts":{}}],"id":1,"jsonrpc":"2.0"}"#
280 );
281 }
282}