reth_rpc_builder/
auth.rs

1use crate::{
2    error::{RpcError, ServerKind},
3    middleware::RethRpcMiddleware,
4};
5use http::header::AUTHORIZATION;
6use jsonrpsee::{
7    core::{client::SubscriptionClientT, RegisterMethodError},
8    http_client::HeaderMap,
9    server::{AlreadyStoppedError, RpcModule},
10    ws_client::RpcServiceBuilder,
11    Methods,
12};
13use reth_rpc_api::servers::*;
14use reth_rpc_eth_types::EthSubscriptionIdProvider;
15use reth_rpc_layer::{
16    secret_to_bearer_header, AuthClientLayer, AuthLayer, JwtAuthValidator, JwtSecret,
17};
18use reth_rpc_server_types::constants;
19use std::net::{IpAddr, Ipv4Addr, SocketAddr};
20use tower::layer::util::Identity;
21
22pub use jsonrpsee::server::ServerBuilder;
23use jsonrpsee::server::{ServerConfig, ServerConfigBuilder};
24pub use reth_ipc::server::Builder as IpcServerBuilder;
25
26/// Server configuration for the auth server.
27#[derive(Debug)]
28pub struct AuthServerConfig<RpcMiddleware = Identity> {
29    /// Where the server should listen.
30    pub(crate) socket_addr: SocketAddr,
31    /// The secret for the auth layer of the server.
32    pub(crate) secret: JwtSecret,
33    /// Configs for JSON-RPC Http.
34    pub(crate) server_config: ServerConfigBuilder,
35    /// Configs for IPC server
36    pub(crate) ipc_server_config: Option<IpcServerBuilder<Identity, Identity>>,
37    /// IPC endpoint
38    pub(crate) ipc_endpoint: Option<String>,
39    /// Configurable RPC middleware
40    pub(crate) rpc_middleware: RpcMiddleware,
41}
42
43// === impl AuthServerConfig ===
44
45impl AuthServerConfig {
46    /// Convenience function to create a new `AuthServerConfig`.
47    pub const fn builder(secret: JwtSecret) -> AuthServerConfigBuilder {
48        AuthServerConfigBuilder::new(secret)
49    }
50}
51impl<RpcMiddleware> AuthServerConfig<RpcMiddleware> {
52    /// Returns the address the server will listen on.
53    pub const fn address(&self) -> SocketAddr {
54        self.socket_addr
55    }
56
57    /// Configures the rpc middleware.
58    pub fn with_rpc_middleware<T>(self, rpc_middleware: T) -> AuthServerConfig<T> {
59        let Self { socket_addr, secret, server_config, ipc_server_config, ipc_endpoint, .. } = self;
60        AuthServerConfig {
61            socket_addr,
62            secret,
63            server_config,
64            ipc_server_config,
65            ipc_endpoint,
66            rpc_middleware,
67        }
68    }
69
70    /// Convenience function to start a server in one step.
71    pub async fn start(self, module: AuthRpcModule) -> Result<AuthServerHandle, RpcError>
72    where
73        RpcMiddleware: RethRpcMiddleware,
74    {
75        let Self {
76            socket_addr,
77            secret,
78            server_config,
79            ipc_server_config,
80            ipc_endpoint,
81            rpc_middleware,
82        } = self;
83
84        // Create auth middleware.
85        let middleware =
86            tower::ServiceBuilder::new().layer(AuthLayer::new(JwtAuthValidator::new(secret)));
87
88        let rpc_middleware = RpcServiceBuilder::default().layer(rpc_middleware);
89
90        // By default, both http and ws are enabled.
91        let server = ServerBuilder::new()
92            .set_config(server_config.build())
93            .set_http_middleware(middleware)
94            .set_rpc_middleware(rpc_middleware)
95            .build(socket_addr)
96            .await
97            .map_err(|err| RpcError::server_error(err, ServerKind::Auth(socket_addr)))?;
98
99        let local_addr = server
100            .local_addr()
101            .map_err(|err| RpcError::server_error(err, ServerKind::Auth(socket_addr)))?;
102
103        let handle = server.start(module.inner.clone());
104
105        let ipc_handle = if let Some(ipc_server_config) = ipc_server_config {
106            let ipc_endpoint_str = ipc_endpoint
107                .clone()
108                .unwrap_or_else(|| constants::DEFAULT_ENGINE_API_IPC_ENDPOINT.to_string());
109            let ipc_server = ipc_server_config.build(ipc_endpoint_str);
110            let res = ipc_server.start(module.inner).await?;
111            Some(res)
112        } else {
113            None
114        };
115
116        Ok(AuthServerHandle { handle: Some(handle), local_addr, secret, ipc_endpoint, ipc_handle })
117    }
118}
119
120/// Builder type for configuring an `AuthServerConfig`.
121#[derive(Debug)]
122pub struct AuthServerConfigBuilder<RpcMiddleware = Identity> {
123    socket_addr: Option<SocketAddr>,
124    secret: JwtSecret,
125    server_config: Option<ServerConfigBuilder>,
126    ipc_server_config: Option<IpcServerBuilder<Identity, Identity>>,
127    ipc_endpoint: Option<String>,
128    rpc_middleware: RpcMiddleware,
129}
130
131// === impl AuthServerConfigBuilder ===
132
133impl AuthServerConfigBuilder {
134    /// Create a new `AuthServerConfigBuilder` with the given `secret`.
135    pub const fn new(secret: JwtSecret) -> Self {
136        Self {
137            socket_addr: None,
138            secret,
139            server_config: None,
140            ipc_server_config: None,
141            ipc_endpoint: None,
142            rpc_middleware: Identity::new(),
143        }
144    }
145}
146
147impl<RpcMiddleware> AuthServerConfigBuilder<RpcMiddleware> {
148    /// Configures the rpc middleware.
149    pub fn with_rpc_middleware<T>(self, rpc_middleware: T) -> AuthServerConfigBuilder<T> {
150        let Self { socket_addr, secret, server_config, ipc_server_config, ipc_endpoint, .. } = self;
151        AuthServerConfigBuilder {
152            socket_addr,
153            secret,
154            server_config,
155            ipc_server_config,
156            ipc_endpoint,
157            rpc_middleware,
158        }
159    }
160
161    /// Set the socket address for the server.
162    pub const fn socket_addr(mut self, socket_addr: SocketAddr) -> Self {
163        self.socket_addr = Some(socket_addr);
164        self
165    }
166
167    /// Set the socket address for the server.
168    pub const fn maybe_socket_addr(mut self, socket_addr: Option<SocketAddr>) -> Self {
169        self.socket_addr = socket_addr;
170        self
171    }
172
173    /// Set the secret for the server.
174    pub const fn secret(mut self, secret: JwtSecret) -> Self {
175        self.secret = secret;
176        self
177    }
178
179    /// Configures the JSON-RPC server
180    ///
181    /// Note: this always configures an [`EthSubscriptionIdProvider`]
182    /// [`IdProvider`](jsonrpsee::server::IdProvider) for convenience.
183    pub fn with_server_config(mut self, config: ServerConfigBuilder) -> Self {
184        self.server_config = Some(config.set_id_provider(EthSubscriptionIdProvider::default()));
185        self
186    }
187
188    /// Set the ipc endpoint for the server.
189    pub fn ipc_endpoint(mut self, ipc_endpoint: String) -> Self {
190        self.ipc_endpoint = Some(ipc_endpoint);
191        self
192    }
193
194    /// Configures the IPC server
195    ///
196    /// Note: this always configures an [`EthSubscriptionIdProvider`]
197    pub fn with_ipc_config(mut self, config: IpcServerBuilder<Identity, Identity>) -> Self {
198        self.ipc_server_config = Some(config.set_id_provider(EthSubscriptionIdProvider::default()));
199        self
200    }
201
202    /// Build the `AuthServerConfig`.
203    pub fn build(self) -> AuthServerConfig<RpcMiddleware> {
204        AuthServerConfig {
205            socket_addr: self.socket_addr.unwrap_or_else(|| {
206                SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), constants::DEFAULT_AUTH_PORT)
207            }),
208            secret: self.secret,
209            server_config: self.server_config.unwrap_or_else(|| {
210                ServerConfig::builder()
211                    // This needs to large enough to handle large eth_getLogs responses and
212                    // maximum payload bodies limit for
213                    // `engine_getPayloadBodiesByRangeV` ~750MB per
214                    // response should be enough
215                    .max_response_body_size(750 * 1024 * 1024)
216                    // Connections to this server are always authenticated, hence this only
217                    // affects connections from the CL or any other
218                    // client that uses JWT, this should be
219                    // more than enough so that the CL (or multiple CL nodes) will never get
220                    // rate limited
221                    .max_connections(500)
222                    // bump the default request size slightly, there aren't any methods exposed
223                    // with dynamic request params that can exceed this
224                    .max_request_body_size(128 * 1024 * 1024)
225                    .set_id_provider(EthSubscriptionIdProvider::default())
226            }),
227            ipc_server_config: self.ipc_server_config.map(|ipc_server_config| {
228                ipc_server_config
229                    .max_response_body_size(750 * 1024 * 1024)
230                    .max_connections(500)
231                    .max_request_body_size(128 * 1024 * 1024)
232                    .set_id_provider(EthSubscriptionIdProvider::default())
233            }),
234            ipc_endpoint: self.ipc_endpoint,
235            rpc_middleware: self.rpc_middleware,
236        }
237    }
238}
239
240/// Holds installed modules for the auth server.
241#[derive(Debug, Clone)]
242pub struct AuthRpcModule {
243    pub(crate) inner: RpcModule<()>,
244}
245
246impl AuthRpcModule {
247    /// Create a new `AuthRpcModule` with the given `engine_api`.
248    pub fn new(engine: impl IntoEngineApiRpcModule) -> Self {
249        Self { inner: engine.into_rpc_module() }
250    }
251
252    /// Get a reference to the inner `RpcModule`.
253    pub const fn module_mut(&mut self) -> &mut RpcModule<()> {
254        &mut self.inner
255    }
256
257    /// Merge the given [Methods] in the configured authenticated methods.
258    ///
259    /// Fails if any of the methods in other is present already.
260    pub fn merge_auth_methods(
261        &mut self,
262        other: impl Into<Methods>,
263    ) -> Result<bool, RegisterMethodError> {
264        self.module_mut().merge(other.into()).map(|_| true)
265    }
266
267    /// Removes the method with the given name from the configured authenticated methods.
268    ///
269    /// Returns `true` if the method was found and removed, `false` otherwise.
270    pub fn remove_auth_method(&mut self, method_name: &'static str) -> bool {
271        self.module_mut().remove_method(method_name).is_some()
272    }
273
274    /// Removes the given methods from the configured authenticated methods.
275    pub fn remove_auth_methods(&mut self, methods: impl IntoIterator<Item = &'static str>) {
276        for name in methods {
277            self.remove_auth_method(name);
278        }
279    }
280
281    /// Replace the given [Methods] in the configured authenticated methods.
282    pub fn replace_auth_methods(
283        &mut self,
284        other: impl Into<Methods>,
285    ) -> Result<bool, RegisterMethodError> {
286        let other = other.into();
287        self.remove_auth_methods(other.method_names());
288        self.merge_auth_methods(other)
289    }
290
291    /// Convenience function for starting a server
292    pub async fn start_server(
293        self,
294        config: AuthServerConfig,
295    ) -> Result<AuthServerHandle, RpcError> {
296        config.start(self).await
297    }
298}
299
300/// A handle to the spawned auth server.
301///
302/// When this type is dropped or [`AuthServerHandle::stop`] has been called the server will be
303/// stopped.
304#[derive(Clone, Debug)]
305#[must_use = "Server stops if dropped"]
306pub struct AuthServerHandle {
307    local_addr: SocketAddr,
308    handle: Option<jsonrpsee::server::ServerHandle>,
309    secret: JwtSecret,
310    ipc_endpoint: Option<String>,
311    ipc_handle: Option<jsonrpsee::server::ServerHandle>,
312}
313
314// === impl AuthServerHandle ===
315
316impl AuthServerHandle {
317    /// Creates a new handle that isn't connected to any server.
318    ///
319    /// This can be used to satisfy types that require an engine API.
320    pub fn noop() -> Self {
321        Self {
322            local_addr: SocketAddr::new(
323                IpAddr::V4(Ipv4Addr::LOCALHOST),
324                constants::DEFAULT_AUTH_PORT,
325            ),
326            handle: None,
327            secret: JwtSecret::random(),
328            ipc_endpoint: None,
329            ipc_handle: None,
330        }
331    }
332
333    /// Returns the [`SocketAddr`] of the http server if started.
334    pub const fn local_addr(&self) -> SocketAddr {
335        self.local_addr
336    }
337
338    /// Tell the server to stop without waiting for the server to stop.
339    pub fn stop(self) -> Result<(), AlreadyStoppedError> {
340        let Some(handle) = self.handle else { return Ok(()) };
341        handle.stop()
342    }
343
344    /// Returns the url to the http server
345    pub fn http_url(&self) -> String {
346        format!("http://{}", self.local_addr)
347    }
348
349    /// Returns the url to the ws server
350    pub fn ws_url(&self) -> String {
351        format!("ws://{}", self.local_addr)
352    }
353
354    /// Returns a http client connected to the server.
355    ///
356    /// This client uses the JWT token to authenticate requests.
357    pub fn http_client(&self) -> impl SubscriptionClientT + Clone + Send + Sync + Unpin + 'static {
358        // Create a middleware that adds a new JWT token to every request.
359        let secret_layer = AuthClientLayer::new(self.secret);
360        let middleware = tower::ServiceBuilder::default().layer(secret_layer);
361        jsonrpsee::http_client::HttpClientBuilder::default()
362            .set_http_middleware(middleware)
363            .build(self.http_url())
364            .expect("Failed to create http client")
365    }
366
367    /// Returns a ws client connected to the server. Note that the connection can only be
368    /// be established within 1 minute due to the JWT token expiration.
369    pub async fn ws_client(&self) -> jsonrpsee::ws_client::WsClient {
370        jsonrpsee::ws_client::WsClientBuilder::default()
371            .set_headers(HeaderMap::from_iter([(
372                AUTHORIZATION,
373                secret_to_bearer_header(&self.secret),
374            )]))
375            .build(self.ws_url())
376            .await
377            .expect("Failed to create ws client")
378    }
379
380    /// Returns an ipc client connected to the server.
381    #[cfg(unix)]
382    pub async fn ipc_client(&self) -> Option<jsonrpsee::async_client::Client> {
383        use reth_ipc::client::IpcClientBuilder;
384
385        if let Some(ipc_endpoint) = &self.ipc_endpoint {
386            return Some(
387                IpcClientBuilder::default()
388                    .build(ipc_endpoint)
389                    .await
390                    .expect("Failed to create ipc client"),
391            )
392        }
393        None
394    }
395
396    /// Returns an ipc handle
397    pub fn ipc_handle(&self) -> Option<jsonrpsee::server::ServerHandle> {
398        self.ipc_handle.clone()
399    }
400
401    /// Return an ipc endpoint
402    pub fn ipc_endpoint(&self) -> Option<String> {
403        self.ipc_endpoint.clone()
404    }
405}