reth_rpc_builder/
auth.rs

1use crate::error::{RpcError, ServerKind};
2use http::header::AUTHORIZATION;
3use jsonrpsee::{
4    core::RegisterMethodError,
5    http_client::{transport::HttpBackend, HeaderMap},
6    server::{AlreadyStoppedError, RpcModule},
7    Methods,
8};
9use reth_rpc_api::servers::*;
10use reth_rpc_eth_types::EthSubscriptionIdProvider;
11use reth_rpc_layer::{
12    secret_to_bearer_header, AuthClientLayer, AuthClientService, AuthLayer, JwtAuthValidator,
13    JwtSecret,
14};
15use reth_rpc_server_types::constants;
16use std::net::{IpAddr, Ipv4Addr, SocketAddr};
17use tower::layer::util::Identity;
18
19pub use jsonrpsee::server::ServerBuilder;
20pub use reth_ipc::server::Builder as IpcServerBuilder;
21
22/// Server configuration for the auth server.
23#[derive(Debug)]
24pub struct AuthServerConfig {
25    /// Where the server should listen.
26    pub(crate) socket_addr: SocketAddr,
27    /// The secret for the auth layer of the server.
28    pub(crate) secret: JwtSecret,
29    /// Configs for JSON-RPC Http.
30    pub(crate) server_config: ServerBuilder<Identity, Identity>,
31    /// Configs for IPC server
32    pub(crate) ipc_server_config: Option<IpcServerBuilder<Identity, Identity>>,
33    /// IPC endpoint
34    pub(crate) ipc_endpoint: Option<String>,
35}
36
37// === impl AuthServerConfig ===
38
39impl AuthServerConfig {
40    /// Convenience function to create a new `AuthServerConfig`.
41    pub const fn builder(secret: JwtSecret) -> AuthServerConfigBuilder {
42        AuthServerConfigBuilder::new(secret)
43    }
44
45    /// Returns the address the server will listen on.
46    pub const fn address(&self) -> SocketAddr {
47        self.socket_addr
48    }
49
50    /// Convenience function to start a server in one step.
51    pub async fn start(self, module: AuthRpcModule) -> Result<AuthServerHandle, RpcError> {
52        let Self { socket_addr, secret, server_config, ipc_server_config, ipc_endpoint } = self;
53
54        // Create auth middleware.
55        let middleware =
56            tower::ServiceBuilder::new().layer(AuthLayer::new(JwtAuthValidator::new(secret)));
57
58        // By default, both http and ws are enabled.
59        let server = server_config
60            .set_http_middleware(middleware)
61            .build(socket_addr)
62            .await
63            .map_err(|err| RpcError::server_error(err, ServerKind::Auth(socket_addr)))?;
64
65        let local_addr = server
66            .local_addr()
67            .map_err(|err| RpcError::server_error(err, ServerKind::Auth(socket_addr)))?;
68
69        let handle = server.start(module.inner.clone());
70        let mut ipc_handle: Option<jsonrpsee::server::ServerHandle> = None;
71
72        if let Some(ipc_server_config) = ipc_server_config {
73            let ipc_endpoint_str = ipc_endpoint
74                .clone()
75                .unwrap_or_else(|| constants::DEFAULT_ENGINE_API_IPC_ENDPOINT.to_string());
76            let ipc_server = ipc_server_config.build(ipc_endpoint_str);
77            let res = ipc_server.start(module.inner).await?;
78            ipc_handle = Some(res);
79        }
80
81        Ok(AuthServerHandle { handle, local_addr, secret, ipc_endpoint, ipc_handle })
82    }
83}
84
85/// Builder type for configuring an `AuthServerConfig`.
86#[derive(Debug)]
87pub struct AuthServerConfigBuilder {
88    socket_addr: Option<SocketAddr>,
89    secret: JwtSecret,
90    server_config: Option<ServerBuilder<Identity, Identity>>,
91    ipc_server_config: Option<IpcServerBuilder<Identity, Identity>>,
92    ipc_endpoint: Option<String>,
93}
94
95// === impl AuthServerConfigBuilder ===
96
97impl AuthServerConfigBuilder {
98    /// Create a new `AuthServerConfigBuilder` with the given `secret`.
99    pub const fn new(secret: JwtSecret) -> Self {
100        Self {
101            socket_addr: None,
102            secret,
103            server_config: None,
104            ipc_server_config: None,
105            ipc_endpoint: None,
106        }
107    }
108
109    /// Set the socket address for the server.
110    pub const fn socket_addr(mut self, socket_addr: SocketAddr) -> Self {
111        self.socket_addr = Some(socket_addr);
112        self
113    }
114
115    /// Set the socket address for the server.
116    pub const fn maybe_socket_addr(mut self, socket_addr: Option<SocketAddr>) -> Self {
117        self.socket_addr = socket_addr;
118        self
119    }
120
121    /// Set the secret for the server.
122    pub const fn secret(mut self, secret: JwtSecret) -> Self {
123        self.secret = secret;
124        self
125    }
126
127    /// Configures the JSON-RPC server
128    ///
129    /// Note: this always configures an [`EthSubscriptionIdProvider`]
130    /// [`IdProvider`](jsonrpsee::server::IdProvider) for convenience.
131    pub fn with_server_config(mut self, config: ServerBuilder<Identity, Identity>) -> Self {
132        self.server_config = Some(config.set_id_provider(EthSubscriptionIdProvider::default()));
133        self
134    }
135
136    /// Set the ipc endpoint for the server.
137    pub fn ipc_endpoint(mut self, ipc_endpoint: String) -> Self {
138        self.ipc_endpoint = Some(ipc_endpoint);
139        self
140    }
141
142    /// Configures the IPC server
143    ///
144    /// Note: this always configures an [`EthSubscriptionIdProvider`]
145    pub fn with_ipc_config(mut self, config: IpcServerBuilder<Identity, Identity>) -> Self {
146        self.ipc_server_config = Some(config.set_id_provider(EthSubscriptionIdProvider::default()));
147        self
148    }
149
150    /// Build the `AuthServerConfig`.
151    pub fn build(self) -> AuthServerConfig {
152        AuthServerConfig {
153            socket_addr: self.socket_addr.unwrap_or_else(|| {
154                SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), constants::DEFAULT_AUTH_PORT)
155            }),
156            secret: self.secret,
157            server_config: self.server_config.unwrap_or_else(|| {
158                ServerBuilder::new()
159                    // This needs to large enough to handle large eth_getLogs responses and maximum
160                    // payload bodies limit for `engine_getPayloadBodiesByRangeV`
161                    // ~750MB per response should be enough
162                    .max_response_body_size(750 * 1024 * 1024)
163                    // Connections to this server are always authenticated, hence this only affects
164                    // connections from the CL or any other client that uses JWT, this should be
165                    // more than enough so that the CL (or multiple CL nodes) will never get rate
166                    // limited
167                    .max_connections(500)
168                    // bump the default request size slightly, there aren't any methods exposed with
169                    // dynamic request params that can exceed this
170                    .max_request_body_size(128 * 1024 * 1024)
171                    .set_id_provider(EthSubscriptionIdProvider::default())
172            }),
173            ipc_server_config: self.ipc_server_config.map(|ipc_server_config| {
174                ipc_server_config
175                    .max_response_body_size(750 * 1024 * 1024)
176                    .max_connections(500)
177                    .max_request_body_size(128 * 1024 * 1024)
178                    .set_id_provider(EthSubscriptionIdProvider::default())
179            }),
180            ipc_endpoint: self.ipc_endpoint,
181        }
182    }
183}
184
185/// Holds installed modules for the auth server.
186#[derive(Debug, Clone)]
187pub struct AuthRpcModule {
188    pub(crate) inner: RpcModule<()>,
189}
190
191impl AuthRpcModule {
192    /// Create a new `AuthRpcModule` with the given `engine_api`.
193    pub fn new(engine: impl IntoEngineApiRpcModule) -> Self {
194        Self { inner: engine.into_rpc_module() }
195    }
196
197    /// Get a reference to the inner `RpcModule`.
198    pub const fn module_mut(&mut self) -> &mut RpcModule<()> {
199        &mut self.inner
200    }
201
202    /// Merge the given [Methods] in the configured authenticated methods.
203    ///
204    /// Fails if any of the methods in other is present already.
205    pub fn merge_auth_methods(
206        &mut self,
207        other: impl Into<Methods>,
208    ) -> Result<bool, RegisterMethodError> {
209        self.module_mut().merge(other.into()).map(|_| true)
210    }
211
212    /// Removes the method with the given name from the configured authenticated methods.
213    ///
214    /// Returns `true` if the method was found and removed, `false` otherwise.
215    pub fn remove_auth_method(&mut self, method_name: &'static str) -> bool {
216        self.module_mut().remove_method(method_name).is_some()
217    }
218
219    /// Removes the given methods from the configured authenticated methods.
220    pub fn remove_auth_methods(&mut self, methods: impl IntoIterator<Item = &'static str>) {
221        for name in methods {
222            self.remove_auth_method(name);
223        }
224    }
225
226    /// Replace the given [Methods] in the configured authenticated methods.
227    pub fn replace_auth_methods(
228        &mut self,
229        other: impl Into<Methods>,
230    ) -> Result<bool, RegisterMethodError> {
231        let other = other.into();
232        self.remove_auth_methods(other.method_names());
233        self.merge_auth_methods(other)
234    }
235
236    /// Convenience function for starting a server
237    pub async fn start_server(
238        self,
239        config: AuthServerConfig,
240    ) -> Result<AuthServerHandle, RpcError> {
241        config.start(self).await
242    }
243}
244
245/// A handle to the spawned auth server.
246///
247/// When this type is dropped or [`AuthServerHandle::stop`] has been called the server will be
248/// stopped.
249#[derive(Clone, Debug)]
250#[must_use = "Server stops if dropped"]
251pub struct AuthServerHandle {
252    local_addr: SocketAddr,
253    handle: jsonrpsee::server::ServerHandle,
254    secret: JwtSecret,
255    ipc_endpoint: Option<String>,
256    ipc_handle: Option<jsonrpsee::server::ServerHandle>,
257}
258
259// === impl AuthServerHandle ===
260
261impl AuthServerHandle {
262    /// Returns the [`SocketAddr`] of the http server if started.
263    pub const fn local_addr(&self) -> SocketAddr {
264        self.local_addr
265    }
266
267    /// Tell the server to stop without waiting for the server to stop.
268    pub fn stop(self) -> Result<(), AlreadyStoppedError> {
269        self.handle.stop()
270    }
271
272    /// Returns the url to the http server
273    pub fn http_url(&self) -> String {
274        format!("http://{}", self.local_addr)
275    }
276
277    /// Returns the url to the ws server
278    pub fn ws_url(&self) -> String {
279        format!("ws://{}", self.local_addr)
280    }
281
282    /// Returns a http client connected to the server.
283    pub fn http_client(
284        &self,
285    ) -> jsonrpsee::http_client::HttpClient<AuthClientService<HttpBackend>> {
286        // Create a middleware that adds a new JWT token to every request.
287        let secret_layer = AuthClientLayer::new(self.secret);
288        let middleware = tower::ServiceBuilder::default().layer(secret_layer);
289        jsonrpsee::http_client::HttpClientBuilder::default()
290            .set_http_middleware(middleware)
291            .build(self.http_url())
292            .expect("Failed to create http client")
293    }
294
295    /// Returns a ws client connected to the server. Note that the connection can only be
296    /// be established within 1 minute due to the JWT token expiration.
297    pub async fn ws_client(&self) -> jsonrpsee::ws_client::WsClient {
298        jsonrpsee::ws_client::WsClientBuilder::default()
299            .set_headers(HeaderMap::from_iter([(
300                AUTHORIZATION,
301                secret_to_bearer_header(&self.secret),
302            )]))
303            .build(self.ws_url())
304            .await
305            .expect("Failed to create ws client")
306    }
307
308    /// Returns an ipc client connected to the server.
309    #[cfg(unix)]
310    pub async fn ipc_client(&self) -> Option<jsonrpsee::async_client::Client> {
311        use reth_ipc::client::IpcClientBuilder;
312
313        if let Some(ipc_endpoint) = &self.ipc_endpoint {
314            return Some(
315                IpcClientBuilder::default()
316                    .build(ipc_endpoint)
317                    .await
318                    .expect("Failed to create ipc client"),
319            )
320        }
321        None
322    }
323
324    /// Returns an ipc handle
325    pub fn ipc_handle(&self) -> Option<jsonrpsee::server::ServerHandle> {
326        self.ipc_handle.clone()
327    }
328
329    /// Return an ipc endpoint
330    pub fn ipc_endpoint(&self) -> Option<String> {
331        self.ipc_endpoint.clone()
332    }
333}