1#![doc(
15 html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
16 html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
17 issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
18)]
19#![cfg_attr(not(test), warn(unused_crate_dependencies))]
20#![cfg_attr(docsrs, feature(doc_cfg))]
21
22use crate::{auth::AuthRpcModule, error::WsHttpSamePortError, metrics::RpcRequestMetrics};
23use alloy_network::{Ethereum, IntoWallet};
24use alloy_provider::{fillers::RecommendedFillers, Provider, ProviderBuilder};
25use core::marker::PhantomData;
26use error::{ConflictingModules, RpcError, ServerKind};
27use http::{header::AUTHORIZATION, HeaderMap};
28use jsonrpsee::{
29 core::RegisterMethodError,
30 server::{middleware::rpc::RpcServiceBuilder, AlreadyStoppedError, IdProvider, ServerHandle},
31 Methods, RpcModule,
32};
33use reth_chainspec::{ChainSpecProvider, EthereumHardforks};
34use reth_consensus::{ConsensusError, FullConsensus};
35use reth_evm::ConfigureEvm;
36use reth_network_api::{noop::NoopNetwork, NetworkInfo, Peers};
37use reth_primitives_traits::{NodePrimitives, TxTy};
38use reth_rpc::{
39 AdminApi, DebugApi, EngineEthApi, EthApi, EthApiBuilder, EthBundle, MinerApi, NetApi,
40 OtterscanApi, RPCApi, RethApi, TraceApi, TxPoolApi, ValidationApiConfig, Web3Api,
41};
42use reth_rpc_api::servers::*;
43use reth_rpc_eth_api::{
44 helpers::{
45 pending_block::PendingEnvBuilder, Call, EthApiSpec, EthTransactions, LoadPendingBlock,
46 TraceExt,
47 },
48 node::RpcNodeCoreAdapter,
49 EthApiServer, EthApiTypes, FullEthApiServer, RpcBlock, RpcConvert, RpcConverter, RpcHeader,
50 RpcNodeCore, RpcReceipt, RpcTransaction, RpcTxReq,
51};
52use reth_rpc_eth_types::{receipt::EthReceiptConverter, EthConfig, EthSubscriptionIdProvider};
53use reth_rpc_layer::{AuthLayer, Claims, CompressionLayer, JwtAuthValidator, JwtSecret};
54use reth_storage_api::{
55 AccountReader, BlockReader, ChangeSetReader, FullRpcProvider, ProviderBlock,
56 StateProviderFactory,
57};
58use reth_tasks::{pool::BlockingTaskGuard, TaskSpawner, TokioTaskExecutor};
59use reth_transaction_pool::{noop::NoopTransactionPool, TransactionPool};
60use serde::{Deserialize, Serialize};
61use std::{
62 collections::HashMap,
63 fmt::Debug,
64 net::{Ipv4Addr, SocketAddr, SocketAddrV4},
65 time::{Duration, SystemTime, UNIX_EPOCH},
66};
67use tower_http::cors::CorsLayer;
68
69pub use cors::CorsDomainError;
70
71pub use jsonrpsee::server::ServerBuilder;
73use jsonrpsee::server::ServerConfigBuilder;
74pub use reth_ipc::server::{
75 Builder as IpcServerBuilder, RpcServiceBuilder as IpcRpcServiceBuilder,
76};
77pub use reth_rpc_server_types::{constants, RethRpcModule, RpcModuleSelection};
78pub use tower::layer::util::{Identity, Stack};
79
80pub mod auth;
82
83pub mod config;
85
86pub mod middleware;
88
89mod cors;
91
92pub mod error;
94
95pub mod eth;
97pub use eth::EthHandlers;
98
99mod metrics;
101use crate::middleware::RethRpcMiddleware;
102pub use metrics::{MeteredRequestFuture, RpcRequestMetricsService};
103use reth_chain_state::CanonStateSubscriptions;
104use reth_rpc::eth::sim_bundle::EthSimBundle;
105
106pub mod rate_limiter;
108
109#[derive(Debug, Clone)]
113pub struct RpcModuleBuilder<N, Provider, Pool, Network, EvmConfig, Consensus> {
114 provider: Provider,
116 pool: Pool,
118 network: Network,
120 executor: Box<dyn TaskSpawner + 'static>,
122 evm_config: EvmConfig,
124 consensus: Consensus,
126 _primitives: PhantomData<N>,
128}
129
130impl<N, Provider, Pool, Network, EvmConfig, Consensus>
133 RpcModuleBuilder<N, Provider, Pool, Network, EvmConfig, Consensus>
134{
135 pub const fn new(
137 provider: Provider,
138 pool: Pool,
139 network: Network,
140 executor: Box<dyn TaskSpawner + 'static>,
141 evm_config: EvmConfig,
142 consensus: Consensus,
143 ) -> Self {
144 Self { provider, pool, network, executor, evm_config, consensus, _primitives: PhantomData }
145 }
146
147 pub fn with_provider<P>(
149 self,
150 provider: P,
151 ) -> RpcModuleBuilder<N, P, Pool, Network, EvmConfig, Consensus> {
152 let Self { pool, network, executor, evm_config, consensus, _primitives, .. } = self;
153 RpcModuleBuilder { provider, network, pool, executor, evm_config, consensus, _primitives }
154 }
155
156 pub fn with_pool<P>(
158 self,
159 pool: P,
160 ) -> RpcModuleBuilder<N, Provider, P, Network, EvmConfig, Consensus> {
161 let Self { provider, network, executor, evm_config, consensus, _primitives, .. } = self;
162 RpcModuleBuilder { provider, network, pool, executor, evm_config, consensus, _primitives }
163 }
164
165 pub fn with_noop_pool(
171 self,
172 ) -> RpcModuleBuilder<N, Provider, NoopTransactionPool, Network, EvmConfig, Consensus> {
173 let Self { provider, executor, network, evm_config, consensus, _primitives, .. } = self;
174 RpcModuleBuilder {
175 provider,
176 executor,
177 network,
178 evm_config,
179 pool: NoopTransactionPool::default(),
180 consensus,
181 _primitives,
182 }
183 }
184
185 pub fn with_network<Net>(
187 self,
188 network: Net,
189 ) -> RpcModuleBuilder<N, Provider, Pool, Net, EvmConfig, Consensus> {
190 let Self { provider, pool, executor, evm_config, consensus, _primitives, .. } = self;
191 RpcModuleBuilder { provider, network, pool, executor, evm_config, consensus, _primitives }
192 }
193
194 pub fn with_noop_network(
200 self,
201 ) -> RpcModuleBuilder<N, Provider, Pool, NoopNetwork, EvmConfig, Consensus> {
202 let Self { provider, pool, executor, evm_config, consensus, _primitives, .. } = self;
203 RpcModuleBuilder {
204 provider,
205 pool,
206 executor,
207 network: NoopNetwork::default(),
208 evm_config,
209 consensus,
210 _primitives,
211 }
212 }
213
214 pub fn with_executor(self, executor: Box<dyn TaskSpawner + 'static>) -> Self {
216 let Self { pool, network, provider, evm_config, consensus, _primitives, .. } = self;
217 Self { provider, network, pool, executor, evm_config, consensus, _primitives }
218 }
219
220 pub fn with_tokio_executor(self) -> Self {
225 let Self { pool, network, provider, evm_config, consensus, _primitives, .. } = self;
226 Self {
227 provider,
228 network,
229 pool,
230 executor: Box::new(TokioTaskExecutor::default()),
231 evm_config,
232 consensus,
233 _primitives,
234 }
235 }
236
237 pub fn with_evm_config<E>(
239 self,
240 evm_config: E,
241 ) -> RpcModuleBuilder<N, Provider, Pool, Network, E, Consensus> {
242 let Self { provider, pool, executor, network, consensus, _primitives, .. } = self;
243 RpcModuleBuilder { provider, network, pool, executor, evm_config, consensus, _primitives }
244 }
245
246 pub fn with_consensus<C>(
248 self,
249 consensus: C,
250 ) -> RpcModuleBuilder<N, Provider, Pool, Network, EvmConfig, C> {
251 let Self { provider, network, pool, executor, evm_config, _primitives, .. } = self;
252 RpcModuleBuilder { provider, network, pool, executor, evm_config, consensus, _primitives }
253 }
254
255 #[expect(clippy::type_complexity)]
257 pub fn eth_api_builder<ChainSpec>(
258 &self,
259 ) -> EthApiBuilder<
260 RpcNodeCoreAdapter<Provider, Pool, Network, EvmConfig>,
261 RpcConverter<Ethereum, EvmConfig, EthReceiptConverter<ChainSpec>>,
262 >
263 where
264 Provider: Clone,
265 Pool: Clone,
266 Network: Clone,
267 EvmConfig: Clone,
268 RpcNodeCoreAdapter<Provider, Pool, Network, EvmConfig>:
269 RpcNodeCore<Provider: ChainSpecProvider<ChainSpec = ChainSpec>, Evm = EvmConfig>,
270 {
271 EthApiBuilder::new(
272 self.provider.clone(),
273 self.pool.clone(),
274 self.network.clone(),
275 self.evm_config.clone(),
276 )
277 }
278
279 #[expect(clippy::type_complexity)]
285 pub fn bootstrap_eth_api<ChainSpec>(
286 &self,
287 ) -> EthApi<
288 RpcNodeCoreAdapter<Provider, Pool, Network, EvmConfig>,
289 RpcConverter<Ethereum, EvmConfig, EthReceiptConverter<ChainSpec>>,
290 >
291 where
292 Provider: Clone,
293 Pool: Clone,
294 Network: Clone,
295 EvmConfig: ConfigureEvm + Clone,
296 RpcNodeCoreAdapter<Provider, Pool, Network, EvmConfig>:
297 RpcNodeCore<Provider: ChainSpecProvider<ChainSpec = ChainSpec>, Evm = EvmConfig>,
298 RpcConverter<Ethereum, EvmConfig, EthReceiptConverter<ChainSpec>>: RpcConvert,
299 (): PendingEnvBuilder<EvmConfig>,
300 {
301 self.eth_api_builder().build()
302 }
303}
304
305impl<N, Provider, Pool, Network, EvmConfig, Consensus>
306 RpcModuleBuilder<N, Provider, Pool, Network, EvmConfig, Consensus>
307where
308 N: NodePrimitives,
309 Provider: FullRpcProvider<Block = N::Block, Receipt = N::Receipt, Header = N::BlockHeader>
310 + CanonStateSubscriptions<Primitives = N>
311 + AccountReader
312 + ChangeSetReader,
313 Pool: TransactionPool + Clone + 'static,
314 Network: NetworkInfo + Peers + Clone + 'static,
315 EvmConfig: ConfigureEvm<Primitives = N> + 'static,
316 Consensus: FullConsensus<N, Error = ConsensusError> + Clone + 'static,
317{
318 pub fn build_with_auth_server<EthApi>(
325 self,
326 module_config: TransportRpcModuleConfig,
327 engine: impl IntoEngineApiRpcModule,
328 eth: EthApi,
329 ) -> (
330 TransportRpcModules,
331 AuthRpcModule,
332 RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>,
333 )
334 where
335 EthApi: FullEthApiServer<Provider = Provider, Pool = Pool>,
336 {
337 let Self { provider, pool, network, executor, consensus, evm_config, .. } = self;
338
339 let config = module_config.config.clone().unwrap_or_default();
340
341 let mut registry = RpcRegistryInner::new(
342 provider, pool, network, executor, consensus, config, evm_config, eth,
343 );
344
345 let modules = registry.create_transport_rpc_modules(module_config);
346
347 let auth_module = registry.create_auth_module(engine);
348
349 (modules, auth_module, registry)
350 }
351
352 pub fn into_registry<EthApi>(
357 self,
358 config: RpcModuleConfig,
359 eth: EthApi,
360 ) -> RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
361 where
362 EthApi: EthApiTypes + 'static,
363 {
364 let Self { provider, pool, network, executor, consensus, evm_config, .. } = self;
365 RpcRegistryInner::new(provider, pool, network, executor, consensus, config, evm_config, eth)
366 }
367
368 pub fn build<EthApi>(
371 self,
372 module_config: TransportRpcModuleConfig,
373 eth: EthApi,
374 ) -> TransportRpcModules<()>
375 where
376 EthApi: FullEthApiServer<Provider = Provider, Pool = Pool>,
377 {
378 let mut modules = TransportRpcModules::default();
379
380 let Self { provider, pool, network, executor, consensus, evm_config, .. } = self;
381
382 if !module_config.is_empty() {
383 let TransportRpcModuleConfig { http, ws, ipc, config } = module_config.clone();
384
385 let mut registry = RpcRegistryInner::new(
386 provider,
387 pool,
388 network,
389 executor,
390 consensus,
391 config.unwrap_or_default(),
392 evm_config,
393 eth,
394 );
395
396 modules.config = module_config;
397 modules.http = registry.maybe_module(http.as_ref());
398 modules.ws = registry.maybe_module(ws.as_ref());
399 modules.ipc = registry.maybe_module(ipc.as_ref());
400 }
401
402 modules
403 }
404}
405
406impl<N: NodePrimitives> Default for RpcModuleBuilder<N, (), (), (), (), ()> {
407 fn default() -> Self {
408 Self::new((), (), (), Box::new(TokioTaskExecutor::default()), (), ())
409 }
410}
411
412#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize)]
414pub struct RpcModuleConfig {
415 eth: EthConfig,
417 flashbots: ValidationApiConfig,
419}
420
421impl RpcModuleConfig {
424 pub fn builder() -> RpcModuleConfigBuilder {
426 RpcModuleConfigBuilder::default()
427 }
428
429 pub const fn new(eth: EthConfig, flashbots: ValidationApiConfig) -> Self {
431 Self { eth, flashbots }
432 }
433
434 pub const fn eth(&self) -> &EthConfig {
436 &self.eth
437 }
438
439 pub const fn eth_mut(&mut self) -> &mut EthConfig {
441 &mut self.eth
442 }
443}
444
445#[derive(Clone, Debug, Default)]
447pub struct RpcModuleConfigBuilder {
448 eth: Option<EthConfig>,
449 flashbots: Option<ValidationApiConfig>,
450}
451
452impl RpcModuleConfigBuilder {
455 pub fn eth(mut self, eth: EthConfig) -> Self {
457 self.eth = Some(eth);
458 self
459 }
460
461 pub fn flashbots(mut self, flashbots: ValidationApiConfig) -> Self {
463 self.flashbots = Some(flashbots);
464 self
465 }
466
467 pub fn build(self) -> RpcModuleConfig {
469 let Self { eth, flashbots } = self;
470 RpcModuleConfig { eth: eth.unwrap_or_default(), flashbots: flashbots.unwrap_or_default() }
471 }
472
473 pub const fn get_eth(&self) -> Option<&EthConfig> {
475 self.eth.as_ref()
476 }
477
478 pub const fn eth_mut(&mut self) -> &mut Option<EthConfig> {
480 &mut self.eth
481 }
482
483 pub fn eth_mut_or_default(&mut self) -> &mut EthConfig {
485 self.eth.get_or_insert_with(EthConfig::default)
486 }
487}
488
489#[derive(Debug, Clone)]
491#[expect(dead_code)] pub struct RpcRegistryInner<
493 Provider: BlockReader,
494 Pool,
495 Network,
496 EthApi: EthApiTypes,
497 EvmConfig,
498 Consensus,
499> {
500 provider: Provider,
501 pool: Pool,
502 network: Network,
503 executor: Box<dyn TaskSpawner + 'static>,
504 evm_config: EvmConfig,
505 consensus: Consensus,
506 eth: EthHandlers<EthApi>,
508 blocking_pool_guard: BlockingTaskGuard,
510 modules: HashMap<RethRpcModule, Methods>,
512 eth_config: EthConfig,
514}
515
516impl<N, Provider, Pool, Network, EthApi, EvmConfig, Consensus>
519 RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
520where
521 N: NodePrimitives,
522 Provider: StateProviderFactory
523 + CanonStateSubscriptions<Primitives = N>
524 + BlockReader<Block = N::Block, Receipt = N::Receipt>
525 + Clone
526 + Unpin
527 + 'static,
528 Pool: Send + Sync + Clone + 'static,
529 Network: Clone + 'static,
530 EthApi: EthApiTypes + 'static,
531 EvmConfig: ConfigureEvm<Primitives = N>,
532{
533 #[expect(clippy::too_many_arguments)]
535 pub fn new(
536 provider: Provider,
537 pool: Pool,
538 network: Network,
539 executor: Box<dyn TaskSpawner + 'static>,
540 consensus: Consensus,
541 config: RpcModuleConfig,
542 evm_config: EvmConfig,
543 eth_api: EthApi,
544 ) -> Self
545 where
546 EvmConfig: ConfigureEvm<Primitives = N>,
547 {
548 let blocking_pool_guard = BlockingTaskGuard::new(config.eth.max_tracing_requests);
549
550 let eth = EthHandlers::bootstrap(config.eth.clone(), executor.clone(), eth_api);
551
552 Self {
553 provider,
554 pool,
555 network,
556 eth,
557 executor,
558 consensus,
559 modules: Default::default(),
560 blocking_pool_guard,
561 eth_config: config.eth,
562 evm_config,
563 }
564 }
565}
566
567impl<Provider, Pool, Network, EthApi, BlockExecutor, Consensus>
568 RpcRegistryInner<Provider, Pool, Network, EthApi, BlockExecutor, Consensus>
569where
570 Provider: BlockReader,
571 EthApi: EthApiTypes,
572{
573 pub const fn eth_api(&self) -> &EthApi {
575 &self.eth.api
576 }
577
578 pub const fn eth_handlers(&self) -> &EthHandlers<EthApi> {
580 &self.eth
581 }
582
583 pub const fn pool(&self) -> &Pool {
585 &self.pool
586 }
587
588 pub const fn tasks(&self) -> &(dyn TaskSpawner + 'static) {
590 &*self.executor
591 }
592
593 pub const fn provider(&self) -> &Provider {
595 &self.provider
596 }
597
598 pub fn methods(&self) -> Vec<Methods> {
600 self.modules.values().cloned().collect()
601 }
602
603 pub fn module(&self) -> RpcModule<()> {
605 let mut module = RpcModule::new(());
606 for methods in self.modules.values().cloned() {
607 module.merge(methods).expect("No conflicts");
608 }
609 module
610 }
611}
612
613impl<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
614 RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
615where
616 Network: NetworkInfo + Clone + 'static,
617 EthApi: EthApiTypes,
618 Provider: BlockReader + ChainSpecProvider<ChainSpec: EthereumHardforks>,
619 EvmConfig: ConfigureEvm,
620{
621 pub fn admin_api(&self) -> AdminApi<Network, Provider::ChainSpec, Pool>
623 where
624 Network: Peers,
625 Pool: TransactionPool + Clone + 'static,
626 {
627 AdminApi::new(self.network.clone(), self.provider.chain_spec(), self.pool.clone())
628 }
629
630 pub fn web3_api(&self) -> Web3Api<Network> {
632 Web3Api::new(self.network.clone())
633 }
634
635 pub fn register_admin(&mut self) -> &mut Self
637 where
638 Network: Peers,
639 Pool: TransactionPool + Clone + 'static,
640 {
641 let adminapi = self.admin_api();
642 self.modules.insert(RethRpcModule::Admin, adminapi.into_rpc().into());
643 self
644 }
645
646 pub fn register_web3(&mut self) -> &mut Self {
648 let web3api = self.web3_api();
649 self.modules.insert(RethRpcModule::Web3, web3api.into_rpc().into());
650 self
651 }
652}
653
654impl<N, Provider, Pool, Network, EthApi, EvmConfig, Consensus>
655 RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
656where
657 N: NodePrimitives,
658 Provider: FullRpcProvider<
659 Header = N::BlockHeader,
660 Block = N::Block,
661 Receipt = N::Receipt,
662 Transaction = N::SignedTx,
663 > + AccountReader
664 + ChangeSetReader
665 + CanonStateSubscriptions,
666 Network: NetworkInfo + Peers + Clone + 'static,
667 EthApi: EthApiServer<
668 RpcTxReq<EthApi::NetworkTypes>,
669 RpcTransaction<EthApi::NetworkTypes>,
670 RpcBlock<EthApi::NetworkTypes>,
671 RpcReceipt<EthApi::NetworkTypes>,
672 RpcHeader<EthApi::NetworkTypes>,
673 TxTy<N>,
674 > + EthApiTypes,
675 EvmConfig: ConfigureEvm<Primitives = N> + 'static,
676{
677 pub fn register_eth(&mut self) -> &mut Self {
683 let eth_api = self.eth_api().clone();
684 self.modules.insert(RethRpcModule::Eth, eth_api.into_rpc().into());
685 self
686 }
687
688 pub fn register_ots(&mut self) -> &mut Self
694 where
695 EthApi: TraceExt + EthTransactions<Primitives = N>,
696 {
697 let otterscan_api = self.otterscan_api();
698 self.modules.insert(RethRpcModule::Ots, otterscan_api.into_rpc().into());
699 self
700 }
701
702 pub fn register_debug(&mut self) -> &mut Self
708 where
709 EthApi: EthApiSpec + EthTransactions + TraceExt,
710 EvmConfig::Primitives: NodePrimitives<Block = ProviderBlock<EthApi::Provider>>,
711 {
712 let debug_api = self.debug_api();
713 self.modules.insert(RethRpcModule::Debug, debug_api.into_rpc().into());
714 self
715 }
716
717 pub fn register_trace(&mut self) -> &mut Self
723 where
724 EthApi: TraceExt,
725 {
726 let trace_api = self.trace_api();
727 self.modules.insert(RethRpcModule::Trace, trace_api.into_rpc().into());
728 self
729 }
730
731 pub fn register_net(&mut self) -> &mut Self
739 where
740 EthApi: EthApiSpec + 'static,
741 {
742 let netapi = self.net_api();
743 self.modules.insert(RethRpcModule::Net, netapi.into_rpc().into());
744 self
745 }
746
747 pub fn register_reth(&mut self) -> &mut Self {
755 let rethapi = self.reth_api();
756 self.modules.insert(RethRpcModule::Reth, rethapi.into_rpc().into());
757 self
758 }
759
760 pub fn otterscan_api(&self) -> OtterscanApi<EthApi> {
766 let eth_api = self.eth_api().clone();
767 OtterscanApi::new(eth_api)
768 }
769}
770
771impl<N, Provider, Pool, Network, EthApi, EvmConfig, Consensus>
772 RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
773where
774 N: NodePrimitives,
775 Provider: FullRpcProvider<
776 Block = N::Block,
777 Header = N::BlockHeader,
778 Transaction = N::SignedTx,
779 Receipt = N::Receipt,
780 > + AccountReader
781 + ChangeSetReader,
782 Network: NetworkInfo + Peers + Clone + 'static,
783 EthApi: EthApiTypes,
784 EvmConfig: ConfigureEvm<Primitives = N>,
785{
786 pub fn trace_api(&self) -> TraceApi<EthApi> {
792 TraceApi::new(
793 self.eth_api().clone(),
794 self.blocking_pool_guard.clone(),
795 self.eth_config.clone(),
796 )
797 }
798
799 pub fn bundle_api(&self) -> EthBundle<EthApi>
805 where
806 EthApi: EthTransactions + LoadPendingBlock + Call,
807 {
808 let eth_api = self.eth_api().clone();
809 EthBundle::new(eth_api, self.blocking_pool_guard.clone())
810 }
811
812 pub fn debug_api(&self) -> DebugApi<EthApi> {
818 DebugApi::new(self.eth_api().clone(), self.blocking_pool_guard.clone())
819 }
820
821 pub fn net_api(&self) -> NetApi<Network, EthApi>
827 where
828 EthApi: EthApiSpec + 'static,
829 {
830 let eth_api = self.eth_api().clone();
831 NetApi::new(self.network.clone(), eth_api)
832 }
833
834 pub fn reth_api(&self) -> RethApi<Provider> {
836 RethApi::new(self.provider.clone(), self.executor.clone())
837 }
838}
839
840impl<N, Provider, Pool, Network, EthApi, EvmConfig, Consensus>
841 RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
842where
843 N: NodePrimitives,
844 Provider: FullRpcProvider<Block = N::Block>
845 + CanonStateSubscriptions<Primitives = N>
846 + AccountReader
847 + ChangeSetReader,
848 Pool: TransactionPool + Clone + 'static,
849 Network: NetworkInfo + Peers + Clone + 'static,
850 EthApi: FullEthApiServer,
851 EvmConfig: ConfigureEvm<Primitives = N> + 'static,
852 Consensus: FullConsensus<N, Error = ConsensusError> + Clone + 'static,
853{
854 pub fn create_auth_module(&self, engine_api: impl IntoEngineApiRpcModule) -> AuthRpcModule {
860 let mut module = engine_api.into_rpc_module();
861
862 let eth_handlers = self.eth_handlers();
864 let engine_eth = EngineEthApi::new(eth_handlers.api.clone(), eth_handlers.filter.clone());
865
866 module.merge(engine_eth.into_rpc()).expect("No conflicting methods");
867
868 AuthRpcModule { inner: module }
869 }
870
871 fn maybe_module(&mut self, config: Option<&RpcModuleSelection>) -> Option<RpcModule<()>> {
873 config.map(|config| self.module_for(config))
874 }
875
876 pub fn create_transport_rpc_modules(
880 &mut self,
881 config: TransportRpcModuleConfig,
882 ) -> TransportRpcModules<()> {
883 let mut modules = TransportRpcModules::default();
884 let http = self.maybe_module(config.http.as_ref());
885 let ws = self.maybe_module(config.ws.as_ref());
886 let ipc = self.maybe_module(config.ipc.as_ref());
887
888 modules.config = config;
889 modules.http = http;
890 modules.ws = ws;
891 modules.ipc = ipc;
892 modules
893 }
894
895 pub fn module_for(&mut self, config: &RpcModuleSelection) -> RpcModule<()> {
898 let mut module = RpcModule::new(());
899 let all_methods = self.reth_methods(config.iter_selection());
900 for methods in all_methods {
901 module.merge(methods).expect("No conflicts");
902 }
903 module
904 }
905
906 pub fn reth_methods(
915 &mut self,
916 namespaces: impl Iterator<Item = RethRpcModule>,
917 ) -> Vec<Methods> {
918 let EthHandlers { api: eth_api, filter: eth_filter, pubsub: eth_pubsub, .. } =
919 self.eth_handlers().clone();
920
921 let namespaces: Vec<_> = namespaces.collect();
923 namespaces
924 .iter()
925 .map(|namespace| {
926 self.modules
927 .entry(namespace.clone())
928 .or_insert_with(|| match namespace.clone() {
929 RethRpcModule::Admin => AdminApi::new(
930 self.network.clone(),
931 self.provider.chain_spec(),
932 self.pool.clone(),
933 )
934 .into_rpc()
935 .into(),
936 RethRpcModule::Debug => {
937 DebugApi::new(eth_api.clone(), self.blocking_pool_guard.clone())
938 .into_rpc()
939 .into()
940 }
941 RethRpcModule::Eth => {
942 let mut module = eth_api.clone().into_rpc();
944 module.merge(eth_filter.clone().into_rpc()).expect("No conflicts");
945 module.merge(eth_pubsub.clone().into_rpc()).expect("No conflicts");
946 module
947 .merge(
948 EthBundle::new(
949 eth_api.clone(),
950 self.blocking_pool_guard.clone(),
951 )
952 .into_rpc(),
953 )
954 .expect("No conflicts");
955
956 module.into()
957 }
958 RethRpcModule::Net => {
959 NetApi::new(self.network.clone(), eth_api.clone()).into_rpc().into()
960 }
961 RethRpcModule::Trace => TraceApi::new(
962 eth_api.clone(),
963 self.blocking_pool_guard.clone(),
964 self.eth_config.clone(),
965 )
966 .into_rpc()
967 .into(),
968 RethRpcModule::Web3 => Web3Api::new(self.network.clone()).into_rpc().into(),
969 RethRpcModule::Txpool => TxPoolApi::new(
970 self.eth.api.pool().clone(),
971 dyn_clone::clone(self.eth.api.tx_resp_builder()),
972 )
973 .into_rpc()
974 .into(),
975 RethRpcModule::Rpc => RPCApi::new(
976 namespaces
977 .iter()
978 .map(|module| (module.to_string(), "1.0".to_string()))
979 .collect(),
980 )
981 .into_rpc()
982 .into(),
983 RethRpcModule::Ots => OtterscanApi::new(eth_api.clone()).into_rpc().into(),
984 RethRpcModule::Reth => {
985 RethApi::new(self.provider.clone(), self.executor.clone())
986 .into_rpc()
987 .into()
988 }
989 RethRpcModule::Flashbots | RethRpcModule::Other(_) => Default::default(),
995 RethRpcModule::Miner => MinerApi::default().into_rpc().into(),
996 RethRpcModule::Mev => {
997 EthSimBundle::new(eth_api.clone(), self.blocking_pool_guard.clone())
998 .into_rpc()
999 .into()
1000 }
1001 })
1002 .clone()
1003 })
1004 .collect::<Vec<_>>()
1005 }
1006}
1007
1008#[derive(Debug)]
1020pub struct RpcServerConfig<RpcMiddleware = Identity> {
1021 http_server_config: Option<ServerConfigBuilder>,
1023 http_cors_domains: Option<String>,
1025 http_addr: Option<SocketAddr>,
1027 http_disable_compression: bool,
1029 ws_server_config: Option<ServerConfigBuilder>,
1031 ws_cors_domains: Option<String>,
1033 ws_addr: Option<SocketAddr>,
1035 ipc_server_config: Option<IpcServerBuilder<Identity, Identity>>,
1037 ipc_endpoint: Option<String>,
1039 jwt_secret: Option<JwtSecret>,
1041 rpc_middleware: RpcMiddleware,
1043}
1044
1045impl Default for RpcServerConfig<Identity> {
1048 fn default() -> Self {
1050 Self {
1051 http_server_config: None,
1052 http_cors_domains: None,
1053 http_addr: None,
1054 http_disable_compression: false,
1055 ws_server_config: None,
1056 ws_cors_domains: None,
1057 ws_addr: None,
1058 ipc_server_config: None,
1059 ipc_endpoint: None,
1060 jwt_secret: None,
1061 rpc_middleware: Default::default(),
1062 }
1063 }
1064}
1065
1066impl RpcServerConfig {
1067 pub fn http(config: ServerConfigBuilder) -> Self {
1069 Self::default().with_http(config)
1070 }
1071
1072 pub fn ws(config: ServerConfigBuilder) -> Self {
1074 Self::default().with_ws(config)
1075 }
1076
1077 pub fn ipc(config: IpcServerBuilder<Identity, Identity>) -> Self {
1079 Self::default().with_ipc(config)
1080 }
1081
1082 pub fn with_http(mut self, config: ServerConfigBuilder) -> Self {
1087 self.http_server_config =
1088 Some(config.set_id_provider(EthSubscriptionIdProvider::default()));
1089 self
1090 }
1091
1092 pub fn with_ws(mut self, config: ServerConfigBuilder) -> Self {
1097 self.ws_server_config = Some(config.set_id_provider(EthSubscriptionIdProvider::default()));
1098 self
1099 }
1100
1101 pub fn with_ipc(mut self, config: IpcServerBuilder<Identity, Identity>) -> Self {
1106 self.ipc_server_config = Some(config.set_id_provider(EthSubscriptionIdProvider::default()));
1107 self
1108 }
1109}
1110
1111impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
1112 pub fn set_rpc_middleware<T>(self, rpc_middleware: T) -> RpcServerConfig<T> {
1114 RpcServerConfig {
1115 http_server_config: self.http_server_config,
1116 http_cors_domains: self.http_cors_domains,
1117 http_addr: self.http_addr,
1118 http_disable_compression: self.http_disable_compression,
1119 ws_server_config: self.ws_server_config,
1120 ws_cors_domains: self.ws_cors_domains,
1121 ws_addr: self.ws_addr,
1122 ipc_server_config: self.ipc_server_config,
1123 ipc_endpoint: self.ipc_endpoint,
1124 jwt_secret: self.jwt_secret,
1125 rpc_middleware,
1126 }
1127 }
1128
1129 pub fn with_cors(self, cors_domain: Option<String>) -> Self {
1131 self.with_http_cors(cors_domain.clone()).with_ws_cors(cors_domain)
1132 }
1133
1134 pub fn with_ws_cors(mut self, cors_domain: Option<String>) -> Self {
1136 self.ws_cors_domains = cors_domain;
1137 self
1138 }
1139
1140 pub const fn with_http_disable_compression(mut self, http_disable_compression: bool) -> Self {
1142 self.http_disable_compression = http_disable_compression;
1143 self
1144 }
1145
1146 pub fn with_http_cors(mut self, cors_domain: Option<String>) -> Self {
1148 self.http_cors_domains = cors_domain;
1149 self
1150 }
1151
1152 pub const fn with_http_address(mut self, addr: SocketAddr) -> Self {
1157 self.http_addr = Some(addr);
1158 self
1159 }
1160
1161 pub const fn with_ws_address(mut self, addr: SocketAddr) -> Self {
1166 self.ws_addr = Some(addr);
1167 self
1168 }
1169
1170 pub fn with_id_provider<I>(mut self, id_provider: I) -> Self
1174 where
1175 I: IdProvider + Clone + 'static,
1176 {
1177 if let Some(config) = self.http_server_config {
1178 self.http_server_config = Some(config.set_id_provider(id_provider.clone()));
1179 }
1180 if let Some(config) = self.ws_server_config {
1181 self.ws_server_config = Some(config.set_id_provider(id_provider.clone()));
1182 }
1183 if let Some(ipc) = self.ipc_server_config {
1184 self.ipc_server_config = Some(ipc.set_id_provider(id_provider));
1185 }
1186
1187 self
1188 }
1189
1190 pub fn with_ipc_endpoint(mut self, path: impl Into<String>) -> Self {
1194 self.ipc_endpoint = Some(path.into());
1195 self
1196 }
1197
1198 pub const fn with_jwt_secret(mut self, secret: Option<JwtSecret>) -> Self {
1200 self.jwt_secret = secret;
1201 self
1202 }
1203
1204 pub fn with_tokio_runtime(mut self, tokio_runtime: Option<tokio::runtime::Handle>) -> Self {
1206 let Some(tokio_runtime) = tokio_runtime else { return self };
1207 if let Some(http_server_config) = self.http_server_config {
1208 self.http_server_config =
1209 Some(http_server_config.custom_tokio_runtime(tokio_runtime.clone()));
1210 }
1211 if let Some(ws_server_config) = self.ws_server_config {
1212 self.ws_server_config =
1213 Some(ws_server_config.custom_tokio_runtime(tokio_runtime.clone()));
1214 }
1215 if let Some(ipc_server_config) = self.ipc_server_config {
1216 self.ipc_server_config = Some(ipc_server_config.custom_tokio_runtime(tokio_runtime));
1217 }
1218 self
1219 }
1220
1221 pub const fn has_server(&self) -> bool {
1225 self.http_server_config.is_some() ||
1226 self.ws_server_config.is_some() ||
1227 self.ipc_server_config.is_some()
1228 }
1229
1230 pub const fn http_address(&self) -> Option<SocketAddr> {
1232 self.http_addr
1233 }
1234
1235 pub const fn ws_address(&self) -> Option<SocketAddr> {
1237 self.ws_addr
1238 }
1239
1240 pub fn ipc_endpoint(&self) -> Option<String> {
1242 self.ipc_endpoint.clone()
1243 }
1244
1245 fn maybe_cors_layer(cors: Option<String>) -> Result<Option<CorsLayer>, CorsDomainError> {
1247 cors.as_deref().map(cors::create_cors_layer).transpose()
1248 }
1249
1250 fn maybe_jwt_layer(jwt_secret: Option<JwtSecret>) -> Option<AuthLayer<JwtAuthValidator>> {
1252 jwt_secret.map(|secret| AuthLayer::new(JwtAuthValidator::new(secret)))
1253 }
1254
1255 fn maybe_compression_layer(disable_compression: bool) -> Option<CompressionLayer> {
1258 if disable_compression {
1259 None
1260 } else {
1261 Some(CompressionLayer::new())
1262 }
1263 }
1264
1265 pub async fn start(self, modules: &TransportRpcModules) -> Result<RpcServerHandle, RpcError>
1271 where
1272 RpcMiddleware: RethRpcMiddleware,
1273 {
1274 let mut http_handle = None;
1275 let mut ws_handle = None;
1276 let mut ipc_handle = None;
1277
1278 let http_socket_addr = self.http_addr.unwrap_or(SocketAddr::V4(SocketAddrV4::new(
1279 Ipv4Addr::LOCALHOST,
1280 constants::DEFAULT_HTTP_RPC_PORT,
1281 )));
1282
1283 let ws_socket_addr = self.ws_addr.unwrap_or(SocketAddr::V4(SocketAddrV4::new(
1284 Ipv4Addr::LOCALHOST,
1285 constants::DEFAULT_WS_RPC_PORT,
1286 )));
1287
1288 let metrics = modules.ipc.as_ref().map(RpcRequestMetrics::ipc).unwrap_or_default();
1289 let ipc_path =
1290 self.ipc_endpoint.clone().unwrap_or_else(|| constants::DEFAULT_IPC_ENDPOINT.into());
1291
1292 if let Some(builder) = self.ipc_server_config {
1293 let ipc = builder
1294 .set_rpc_middleware(IpcRpcServiceBuilder::new().layer(metrics))
1295 .build(ipc_path);
1296 ipc_handle = Some(ipc.start(modules.ipc.clone().expect("ipc server error")).await?);
1297 }
1298
1299 if self.http_addr == self.ws_addr &&
1301 self.http_server_config.is_some() &&
1302 self.ws_server_config.is_some()
1303 {
1304 let cors = match (self.ws_cors_domains.as_ref(), self.http_cors_domains.as_ref()) {
1305 (Some(ws_cors), Some(http_cors)) => {
1306 if ws_cors.trim() != http_cors.trim() {
1307 return Err(WsHttpSamePortError::ConflictingCorsDomains {
1308 http_cors_domains: Some(http_cors.clone()),
1309 ws_cors_domains: Some(ws_cors.clone()),
1310 }
1311 .into());
1312 }
1313 Some(ws_cors)
1314 }
1315 (a, b) => a.or(b),
1316 }
1317 .cloned();
1318
1319 modules.config.ensure_ws_http_identical()?;
1321
1322 if let Some(config) = self.http_server_config {
1323 let server = ServerBuilder::new()
1324 .set_http_middleware(
1325 tower::ServiceBuilder::new()
1326 .option_layer(Self::maybe_cors_layer(cors)?)
1327 .option_layer(Self::maybe_jwt_layer(self.jwt_secret))
1328 .option_layer(Self::maybe_compression_layer(
1329 self.http_disable_compression,
1330 )),
1331 )
1332 .set_rpc_middleware(
1333 RpcServiceBuilder::default()
1334 .layer(
1335 modules
1336 .http
1337 .as_ref()
1338 .or(modules.ws.as_ref())
1339 .map(RpcRequestMetrics::same_port)
1340 .unwrap_or_default(),
1341 )
1342 .layer(self.rpc_middleware.clone()),
1343 )
1344 .set_config(config.build())
1345 .build(http_socket_addr)
1346 .await
1347 .map_err(|err| {
1348 RpcError::server_error(err, ServerKind::WsHttp(http_socket_addr))
1349 })?;
1350 let addr = server.local_addr().map_err(|err| {
1351 RpcError::server_error(err, ServerKind::WsHttp(http_socket_addr))
1352 })?;
1353 if let Some(module) = modules.http.as_ref().or(modules.ws.as_ref()) {
1354 let handle = server.start(module.clone());
1355 http_handle = Some(handle.clone());
1356 ws_handle = Some(handle);
1357 }
1358 return Ok(RpcServerHandle {
1359 http_local_addr: Some(addr),
1360 ws_local_addr: Some(addr),
1361 http: http_handle,
1362 ws: ws_handle,
1363 ipc_endpoint: self.ipc_endpoint.clone(),
1364 ipc: ipc_handle,
1365 jwt_secret: self.jwt_secret,
1366 });
1367 }
1368 }
1369
1370 let mut ws_local_addr = None;
1371 let mut ws_server = None;
1372 let mut http_local_addr = None;
1373 let mut http_server = None;
1374
1375 if let Some(config) = self.ws_server_config {
1376 let server = ServerBuilder::new()
1377 .set_config(config.ws_only().build())
1378 .set_http_middleware(
1379 tower::ServiceBuilder::new()
1380 .option_layer(Self::maybe_cors_layer(self.ws_cors_domains.clone())?)
1381 .option_layer(Self::maybe_jwt_layer(self.jwt_secret)),
1382 )
1383 .set_rpc_middleware(
1384 RpcServiceBuilder::default()
1385 .layer(modules.ws.as_ref().map(RpcRequestMetrics::ws).unwrap_or_default())
1386 .layer(self.rpc_middleware.clone()),
1387 )
1388 .build(ws_socket_addr)
1389 .await
1390 .map_err(|err| RpcError::server_error(err, ServerKind::WS(ws_socket_addr)))?;
1391
1392 let addr = server
1393 .local_addr()
1394 .map_err(|err| RpcError::server_error(err, ServerKind::WS(ws_socket_addr)))?;
1395
1396 ws_local_addr = Some(addr);
1397 ws_server = Some(server);
1398 }
1399
1400 if let Some(config) = self.http_server_config {
1401 let server = ServerBuilder::new()
1402 .set_config(config.http_only().build())
1403 .set_http_middleware(
1404 tower::ServiceBuilder::new()
1405 .option_layer(Self::maybe_cors_layer(self.http_cors_domains.clone())?)
1406 .option_layer(Self::maybe_jwt_layer(self.jwt_secret))
1407 .option_layer(Self::maybe_compression_layer(self.http_disable_compression)),
1408 )
1409 .set_rpc_middleware(
1410 RpcServiceBuilder::default()
1411 .layer(
1412 modules.http.as_ref().map(RpcRequestMetrics::http).unwrap_or_default(),
1413 )
1414 .layer(self.rpc_middleware.clone()),
1415 )
1416 .build(http_socket_addr)
1417 .await
1418 .map_err(|err| RpcError::server_error(err, ServerKind::Http(http_socket_addr)))?;
1419 let local_addr = server
1420 .local_addr()
1421 .map_err(|err| RpcError::server_error(err, ServerKind::Http(http_socket_addr)))?;
1422 http_local_addr = Some(local_addr);
1423 http_server = Some(server);
1424 }
1425
1426 http_handle = http_server
1427 .map(|http_server| http_server.start(modules.http.clone().expect("http server error")));
1428 ws_handle = ws_server
1429 .map(|ws_server| ws_server.start(modules.ws.clone().expect("ws server error")));
1430 Ok(RpcServerHandle {
1431 http_local_addr,
1432 ws_local_addr,
1433 http: http_handle,
1434 ws: ws_handle,
1435 ipc_endpoint: self.ipc_endpoint.clone(),
1436 ipc: ipc_handle,
1437 jwt_secret: self.jwt_secret,
1438 })
1439 }
1440}
1441
1442#[derive(Debug, Clone, Default, Eq, PartialEq)]
1454pub struct TransportRpcModuleConfig {
1455 http: Option<RpcModuleSelection>,
1457 ws: Option<RpcModuleSelection>,
1459 ipc: Option<RpcModuleSelection>,
1461 config: Option<RpcModuleConfig>,
1463}
1464
1465impl TransportRpcModuleConfig {
1468 pub fn set_http(http: impl Into<RpcModuleSelection>) -> Self {
1470 Self::default().with_http(http)
1471 }
1472
1473 pub fn set_ws(ws: impl Into<RpcModuleSelection>) -> Self {
1475 Self::default().with_ws(ws)
1476 }
1477
1478 pub fn set_ipc(ipc: impl Into<RpcModuleSelection>) -> Self {
1480 Self::default().with_ipc(ipc)
1481 }
1482
1483 pub fn with_http(mut self, http: impl Into<RpcModuleSelection>) -> Self {
1485 self.http = Some(http.into());
1486 self
1487 }
1488
1489 pub fn with_ws(mut self, ws: impl Into<RpcModuleSelection>) -> Self {
1491 self.ws = Some(ws.into());
1492 self
1493 }
1494
1495 pub fn with_ipc(mut self, ipc: impl Into<RpcModuleSelection>) -> Self {
1497 self.ipc = Some(ipc.into());
1498 self
1499 }
1500
1501 pub fn with_config(mut self, config: RpcModuleConfig) -> Self {
1503 self.config = Some(config);
1504 self
1505 }
1506
1507 pub const fn http_mut(&mut self) -> &mut Option<RpcModuleSelection> {
1509 &mut self.http
1510 }
1511
1512 pub const fn ws_mut(&mut self) -> &mut Option<RpcModuleSelection> {
1514 &mut self.ws
1515 }
1516
1517 pub const fn ipc_mut(&mut self) -> &mut Option<RpcModuleSelection> {
1519 &mut self.ipc
1520 }
1521
1522 pub const fn config_mut(&mut self) -> &mut Option<RpcModuleConfig> {
1524 &mut self.config
1525 }
1526
1527 pub const fn is_empty(&self) -> bool {
1529 self.http.is_none() && self.ws.is_none() && self.ipc.is_none()
1530 }
1531
1532 pub const fn http(&self) -> Option<&RpcModuleSelection> {
1534 self.http.as_ref()
1535 }
1536
1537 pub const fn ws(&self) -> Option<&RpcModuleSelection> {
1539 self.ws.as_ref()
1540 }
1541
1542 pub const fn ipc(&self) -> Option<&RpcModuleSelection> {
1544 self.ipc.as_ref()
1545 }
1546
1547 pub const fn config(&self) -> Option<&RpcModuleConfig> {
1549 self.config.as_ref()
1550 }
1551
1552 pub fn contains_any(&self, module: &RethRpcModule) -> bool {
1554 self.contains_http(module) || self.contains_ws(module) || self.contains_ipc(module)
1555 }
1556
1557 pub fn contains_http(&self, module: &RethRpcModule) -> bool {
1559 self.http.as_ref().is_some_and(|http| http.contains(module))
1560 }
1561
1562 pub fn contains_ws(&self, module: &RethRpcModule) -> bool {
1564 self.ws.as_ref().is_some_and(|ws| ws.contains(module))
1565 }
1566
1567 pub fn contains_ipc(&self, module: &RethRpcModule) -> bool {
1569 self.ipc.as_ref().is_some_and(|ipc| ipc.contains(module))
1570 }
1571
1572 fn ensure_ws_http_identical(&self) -> Result<(), WsHttpSamePortError> {
1575 if RpcModuleSelection::are_identical(self.http.as_ref(), self.ws.as_ref()) {
1576 Ok(())
1577 } else {
1578 let http_modules =
1579 self.http.as_ref().map(RpcModuleSelection::to_selection).unwrap_or_default();
1580 let ws_modules =
1581 self.ws.as_ref().map(RpcModuleSelection::to_selection).unwrap_or_default();
1582
1583 let http_not_ws = http_modules.difference(&ws_modules).cloned().collect();
1584 let ws_not_http = ws_modules.difference(&http_modules).cloned().collect();
1585 let overlap = http_modules.intersection(&ws_modules).cloned().collect();
1586
1587 Err(WsHttpSamePortError::ConflictingModules(Box::new(ConflictingModules {
1588 overlap,
1589 http_not_ws,
1590 ws_not_http,
1591 })))
1592 }
1593 }
1594}
1595
1596#[derive(Debug, Clone, Default)]
1598pub struct TransportRpcModules<Context = ()> {
1599 config: TransportRpcModuleConfig,
1601 http: Option<RpcModule<Context>>,
1603 ws: Option<RpcModule<Context>>,
1605 ipc: Option<RpcModule<Context>>,
1607}
1608
1609impl TransportRpcModules {
1612 pub fn with_config(mut self, config: TransportRpcModuleConfig) -> Self {
1615 self.config = config;
1616 self
1617 }
1618
1619 pub fn with_http(mut self, http: RpcModule<()>) -> Self {
1622 self.http = Some(http);
1623 self
1624 }
1625
1626 pub fn with_ws(mut self, ws: RpcModule<()>) -> Self {
1629 self.ws = Some(ws);
1630 self
1631 }
1632
1633 pub fn with_ipc(mut self, ipc: RpcModule<()>) -> Self {
1636 self.ipc = Some(ipc);
1637 self
1638 }
1639
1640 pub const fn module_config(&self) -> &TransportRpcModuleConfig {
1642 &self.config
1643 }
1644
1645 pub fn merge_if_module_configured(
1650 &mut self,
1651 module: RethRpcModule,
1652 other: impl Into<Methods>,
1653 ) -> Result<(), RegisterMethodError> {
1654 let other = other.into();
1655 if self.module_config().contains_http(&module) {
1656 self.merge_http(other.clone())?;
1657 }
1658 if self.module_config().contains_ws(&module) {
1659 self.merge_ws(other.clone())?;
1660 }
1661 if self.module_config().contains_ipc(&module) {
1662 self.merge_ipc(other)?;
1663 }
1664
1665 Ok(())
1666 }
1667
1668 pub fn merge_http(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
1674 if let Some(ref mut http) = self.http {
1675 return http.merge(other.into()).map(|_| true)
1676 }
1677 Ok(false)
1678 }
1679
1680 pub fn merge_ws(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
1686 if let Some(ref mut ws) = self.ws {
1687 return ws.merge(other.into()).map(|_| true)
1688 }
1689 Ok(false)
1690 }
1691
1692 pub fn merge_ipc(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
1698 if let Some(ref mut ipc) = self.ipc {
1699 return ipc.merge(other.into()).map(|_| true)
1700 }
1701 Ok(false)
1702 }
1703
1704 pub fn merge_configured(
1708 &mut self,
1709 other: impl Into<Methods>,
1710 ) -> Result<(), RegisterMethodError> {
1711 let other = other.into();
1712 self.merge_http(other.clone())?;
1713 self.merge_ws(other.clone())?;
1714 self.merge_ipc(other)?;
1715 Ok(())
1716 }
1717
1718 pub fn methods_by_module(&self, module: RethRpcModule) -> Methods {
1722 self.methods_by(|name| name.starts_with(module.as_str()))
1723 }
1724
1725 pub fn methods_by<F>(&self, mut filter: F) -> Methods
1729 where
1730 F: FnMut(&str) -> bool,
1731 {
1732 let mut methods = Methods::new();
1733
1734 let mut f =
1736 |name: &str, mm: &Methods| filter(name) && !mm.method_names().any(|m| m == name);
1737
1738 if let Some(m) = self.http_methods(|name| f(name, &methods)) {
1739 let _ = methods.merge(m);
1740 }
1741 if let Some(m) = self.ws_methods(|name| f(name, &methods)) {
1742 let _ = methods.merge(m);
1743 }
1744 if let Some(m) = self.ipc_methods(|name| f(name, &methods)) {
1745 let _ = methods.merge(m);
1746 }
1747 methods
1748 }
1749
1750 pub fn http_methods<F>(&self, filter: F) -> Option<Methods>
1754 where
1755 F: FnMut(&str) -> bool,
1756 {
1757 self.http.as_ref().map(|module| methods_by(module, filter))
1758 }
1759
1760 pub fn ws_methods<F>(&self, filter: F) -> Option<Methods>
1764 where
1765 F: FnMut(&str) -> bool,
1766 {
1767 self.ws.as_ref().map(|module| methods_by(module, filter))
1768 }
1769
1770 pub fn ipc_methods<F>(&self, filter: F) -> Option<Methods>
1774 where
1775 F: FnMut(&str) -> bool,
1776 {
1777 self.ipc.as_ref().map(|module| methods_by(module, filter))
1778 }
1779
1780 pub fn remove_http_method(&mut self, method_name: &'static str) -> bool {
1788 if let Some(http_module) = &mut self.http {
1789 http_module.remove_method(method_name).is_some()
1790 } else {
1791 false
1792 }
1793 }
1794
1795 pub fn remove_http_methods(&mut self, methods: impl IntoIterator<Item = &'static str>) {
1797 for name in methods {
1798 self.remove_http_method(name);
1799 }
1800 }
1801
1802 pub fn remove_ws_method(&mut self, method_name: &'static str) -> bool {
1810 if let Some(ws_module) = &mut self.ws {
1811 ws_module.remove_method(method_name).is_some()
1812 } else {
1813 false
1814 }
1815 }
1816
1817 pub fn remove_ws_methods(&mut self, methods: impl IntoIterator<Item = &'static str>) {
1819 for name in methods {
1820 self.remove_ws_method(name);
1821 }
1822 }
1823
1824 pub fn remove_ipc_method(&mut self, method_name: &'static str) -> bool {
1832 if let Some(ipc_module) = &mut self.ipc {
1833 ipc_module.remove_method(method_name).is_some()
1834 } else {
1835 false
1836 }
1837 }
1838
1839 pub fn remove_ipc_methods(&mut self, methods: impl IntoIterator<Item = &'static str>) {
1841 for name in methods {
1842 self.remove_ipc_method(name);
1843 }
1844 }
1845
1846 pub fn remove_method_from_configured(&mut self, method_name: &'static str) -> bool {
1850 let http_removed = self.remove_http_method(method_name);
1851 let ws_removed = self.remove_ws_method(method_name);
1852 let ipc_removed = self.remove_ipc_method(method_name);
1853
1854 http_removed || ws_removed || ipc_removed
1855 }
1856
1857 pub fn rename(
1861 &mut self,
1862 old_name: &'static str,
1863 new_method: impl Into<Methods>,
1864 ) -> Result<(), RegisterMethodError> {
1865 self.remove_method_from_configured(old_name);
1867
1868 self.merge_configured(new_method)
1870 }
1871
1872 pub fn replace_http(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
1879 let other = other.into();
1880 self.remove_http_methods(other.method_names());
1881 self.merge_http(other)
1882 }
1883
1884 pub fn replace_ipc(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
1891 let other = other.into();
1892 self.remove_ipc_methods(other.method_names());
1893 self.merge_ipc(other)
1894 }
1895
1896 pub fn replace_ws(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
1903 let other = other.into();
1904 self.remove_ws_methods(other.method_names());
1905 self.merge_ws(other)
1906 }
1907
1908 pub fn replace_configured(
1912 &mut self,
1913 other: impl Into<Methods>,
1914 ) -> Result<bool, RegisterMethodError> {
1915 let other = other.into();
1916 self.replace_http(other.clone())?;
1917 self.replace_ws(other.clone())?;
1918 self.replace_ipc(other)?;
1919 Ok(true)
1920 }
1921
1922 pub fn add_or_replace_http(
1926 &mut self,
1927 other: impl Into<Methods>,
1928 ) -> Result<bool, RegisterMethodError> {
1929 let other = other.into();
1930 self.remove_http_methods(other.method_names());
1931 self.merge_http(other)
1932 }
1933
1934 pub fn add_or_replace_ws(
1938 &mut self,
1939 other: impl Into<Methods>,
1940 ) -> Result<bool, RegisterMethodError> {
1941 let other = other.into();
1942 self.remove_ws_methods(other.method_names());
1943 self.merge_ws(other)
1944 }
1945
1946 pub fn add_or_replace_ipc(
1950 &mut self,
1951 other: impl Into<Methods>,
1952 ) -> Result<bool, RegisterMethodError> {
1953 let other = other.into();
1954 self.remove_ipc_methods(other.method_names());
1955 self.merge_ipc(other)
1956 }
1957
1958 pub fn add_or_replace_configured(
1960 &mut self,
1961 other: impl Into<Methods>,
1962 ) -> Result<(), RegisterMethodError> {
1963 let other = other.into();
1964 self.add_or_replace_http(other.clone())?;
1965 self.add_or_replace_ws(other.clone())?;
1966 self.add_or_replace_ipc(other)?;
1967 Ok(())
1968 }
1969 pub fn add_or_replace_if_module_configured(
1972 &mut self,
1973 module: RethRpcModule,
1974 other: impl Into<Methods>,
1975 ) -> Result<(), RegisterMethodError> {
1976 let other = other.into();
1977 if self.module_config().contains_http(&module) {
1978 self.add_or_replace_http(other.clone())?;
1979 }
1980 if self.module_config().contains_ws(&module) {
1981 self.add_or_replace_ws(other.clone())?;
1982 }
1983 if self.module_config().contains_ipc(&module) {
1984 self.add_or_replace_ipc(other)?;
1985 }
1986 Ok(())
1987 }
1988}
1989
1990fn methods_by<T, F>(module: &RpcModule<T>, mut filter: F) -> Methods
1992where
1993 F: FnMut(&str) -> bool,
1994{
1995 let mut methods = Methods::new();
1996 let method_names = module.method_names().filter(|name| filter(name));
1997
1998 for name in method_names {
1999 if let Some(matched_method) = module.method(name).cloned() {
2000 let _ = methods.verify_and_insert(name, matched_method);
2001 }
2002 }
2003
2004 methods
2005}
2006
2007#[derive(Clone, Debug)]
2012#[must_use = "Server stops if dropped"]
2013pub struct RpcServerHandle {
2014 http_local_addr: Option<SocketAddr>,
2016 ws_local_addr: Option<SocketAddr>,
2017 http: Option<ServerHandle>,
2018 ws: Option<ServerHandle>,
2019 ipc_endpoint: Option<String>,
2020 ipc: Option<jsonrpsee::server::ServerHandle>,
2021 jwt_secret: Option<JwtSecret>,
2022}
2023
2024impl RpcServerHandle {
2027 fn bearer_token(&self) -> Option<String> {
2029 self.jwt_secret.as_ref().map(|secret| {
2030 format!(
2031 "Bearer {}",
2032 secret
2033 .encode(&Claims {
2034 iat: (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() +
2035 Duration::from_secs(60))
2036 .as_secs(),
2037 exp: None,
2038 })
2039 .unwrap()
2040 )
2041 })
2042 }
2043 pub const fn http_local_addr(&self) -> Option<SocketAddr> {
2045 self.http_local_addr
2046 }
2047
2048 pub const fn ws_local_addr(&self) -> Option<SocketAddr> {
2050 self.ws_local_addr
2051 }
2052
2053 pub fn stop(self) -> Result<(), AlreadyStoppedError> {
2055 if let Some(handle) = self.http {
2056 handle.stop()?
2057 }
2058
2059 if let Some(handle) = self.ws {
2060 handle.stop()?
2061 }
2062
2063 if let Some(handle) = self.ipc {
2064 handle.stop()?
2065 }
2066
2067 Ok(())
2068 }
2069
2070 pub fn ipc_endpoint(&self) -> Option<String> {
2072 self.ipc_endpoint.clone()
2073 }
2074
2075 pub fn http_url(&self) -> Option<String> {
2077 self.http_local_addr.map(|addr| format!("http://{addr}"))
2078 }
2079
2080 pub fn ws_url(&self) -> Option<String> {
2082 self.ws_local_addr.map(|addr| format!("ws://{addr}"))
2083 }
2084
2085 pub fn http_client(&self) -> Option<jsonrpsee::http_client::HttpClient> {
2087 let url = self.http_url()?;
2088
2089 let client = if let Some(token) = self.bearer_token() {
2090 jsonrpsee::http_client::HttpClientBuilder::default()
2091 .set_headers(HeaderMap::from_iter([(AUTHORIZATION, token.parse().unwrap())]))
2092 .build(url)
2093 } else {
2094 jsonrpsee::http_client::HttpClientBuilder::default().build(url)
2095 };
2096
2097 client.expect("failed to create http client").into()
2098 }
2099
2100 pub async fn ws_client(&self) -> Option<jsonrpsee::ws_client::WsClient> {
2102 let url = self.ws_url()?;
2103 let mut builder = jsonrpsee::ws_client::WsClientBuilder::default();
2104
2105 if let Some(token) = self.bearer_token() {
2106 let headers = HeaderMap::from_iter([(AUTHORIZATION, token.parse().unwrap())]);
2107 builder = builder.set_headers(headers);
2108 }
2109
2110 let client = builder.build(url).await.expect("failed to create ws client");
2111 Some(client)
2112 }
2113
2114 pub fn eth_http_provider(
2116 &self,
2117 ) -> Option<impl Provider<alloy_network::Ethereum> + Clone + Unpin + 'static> {
2118 self.new_http_provider_for()
2119 }
2120
2121 pub fn eth_http_provider_with_wallet<W>(
2124 &self,
2125 wallet: W,
2126 ) -> Option<impl Provider<alloy_network::Ethereum> + Clone + Unpin + 'static>
2127 where
2128 W: IntoWallet<alloy_network::Ethereum, NetworkWallet: Clone + Unpin + 'static>,
2129 {
2130 let rpc_url = self.http_url()?;
2131 let provider =
2132 ProviderBuilder::new().wallet(wallet).connect_http(rpc_url.parse().expect("valid url"));
2133 Some(provider)
2134 }
2135
2136 pub fn new_http_provider_for<N>(&self) -> Option<impl Provider<N> + Clone + Unpin + 'static>
2141 where
2142 N: RecommendedFillers<RecommendedFillers: Unpin>,
2143 {
2144 let rpc_url = self.http_url()?;
2145 let provider = ProviderBuilder::default()
2146 .with_recommended_fillers()
2147 .connect_http(rpc_url.parse().expect("valid url"));
2148 Some(provider)
2149 }
2150
2151 pub async fn eth_ws_provider(
2153 &self,
2154 ) -> Option<impl Provider<alloy_network::Ethereum> + Clone + Unpin + 'static> {
2155 self.new_ws_provider_for().await
2156 }
2157
2158 pub async fn eth_ws_provider_with_wallet<W>(
2161 &self,
2162 wallet: W,
2163 ) -> Option<impl Provider<alloy_network::Ethereum> + Clone + Unpin + 'static>
2164 where
2165 W: IntoWallet<alloy_network::Ethereum, NetworkWallet: Clone + Unpin + 'static>,
2166 {
2167 let rpc_url = self.ws_url()?;
2168 let provider = ProviderBuilder::new()
2169 .wallet(wallet)
2170 .connect(&rpc_url)
2171 .await
2172 .expect("failed to create ws client");
2173 Some(provider)
2174 }
2175
2176 pub async fn new_ws_provider_for<N>(&self) -> Option<impl Provider<N> + Clone + Unpin + 'static>
2181 where
2182 N: RecommendedFillers<RecommendedFillers: Unpin>,
2183 {
2184 let rpc_url = self.ws_url()?;
2185 let provider = ProviderBuilder::default()
2186 .with_recommended_fillers()
2187 .connect(&rpc_url)
2188 .await
2189 .expect("failed to create ws client");
2190 Some(provider)
2191 }
2192
2193 pub async fn eth_ipc_provider(
2195 &self,
2196 ) -> Option<impl Provider<alloy_network::Ethereum> + Clone + Unpin + 'static> {
2197 self.new_ipc_provider_for().await
2198 }
2199
2200 pub async fn new_ipc_provider_for<N>(
2205 &self,
2206 ) -> Option<impl Provider<N> + Clone + Unpin + 'static>
2207 where
2208 N: RecommendedFillers<RecommendedFillers: Unpin>,
2209 {
2210 let rpc_url = self.ipc_endpoint()?;
2211 let provider = ProviderBuilder::default()
2212 .with_recommended_fillers()
2213 .connect(&rpc_url)
2214 .await
2215 .expect("failed to create ipc client");
2216 Some(provider)
2217 }
2218}
2219
2220#[cfg(test)]
2221mod tests {
2222 use super::*;
2223
2224 #[test]
2225 fn parse_eth_call_bundle_selection() {
2226 let selection = "eth,admin,debug".parse::<RpcModuleSelection>().unwrap();
2227 assert_eq!(
2228 selection,
2229 RpcModuleSelection::Selection(
2230 [RethRpcModule::Eth, RethRpcModule::Admin, RethRpcModule::Debug,].into()
2231 )
2232 );
2233 }
2234
2235 #[test]
2236 fn parse_rpc_module_selection() {
2237 let selection = "all".parse::<RpcModuleSelection>().unwrap();
2238 assert_eq!(selection, RpcModuleSelection::All);
2239 }
2240
2241 #[test]
2242 fn parse_rpc_module_selection_none() {
2243 let selection = "none".parse::<RpcModuleSelection>().unwrap();
2244 assert_eq!(selection, RpcModuleSelection::Selection(Default::default()));
2245 }
2246
2247 #[test]
2248 fn parse_rpc_unique_module_selection() {
2249 let selection = "eth,admin,eth,net".parse::<RpcModuleSelection>().unwrap();
2250 assert_eq!(
2251 selection,
2252 RpcModuleSelection::Selection(
2253 [RethRpcModule::Eth, RethRpcModule::Admin, RethRpcModule::Net,].into()
2254 )
2255 );
2256 }
2257
2258 #[test]
2259 fn identical_selection() {
2260 assert!(RpcModuleSelection::are_identical(
2261 Some(&RpcModuleSelection::All),
2262 Some(&RpcModuleSelection::All),
2263 ));
2264 assert!(!RpcModuleSelection::are_identical(
2265 Some(&RpcModuleSelection::All),
2266 Some(&RpcModuleSelection::Standard),
2267 ));
2268 assert!(RpcModuleSelection::are_identical(
2269 Some(&RpcModuleSelection::Selection(RpcModuleSelection::Standard.to_selection())),
2270 Some(&RpcModuleSelection::Standard),
2271 ));
2272 assert!(RpcModuleSelection::are_identical(
2273 Some(&RpcModuleSelection::Selection([RethRpcModule::Eth].into())),
2274 Some(&RpcModuleSelection::Selection([RethRpcModule::Eth].into())),
2275 ));
2276 assert!(RpcModuleSelection::are_identical(
2277 None,
2278 Some(&RpcModuleSelection::Selection(Default::default())),
2279 ));
2280 assert!(RpcModuleSelection::are_identical(
2281 Some(&RpcModuleSelection::Selection(Default::default())),
2282 None,
2283 ));
2284 assert!(RpcModuleSelection::are_identical(None, None));
2285 }
2286
2287 #[test]
2288 fn test_rpc_module_str() {
2289 macro_rules! assert_rpc_module {
2290 ($($s:expr => $v:expr,)*) => {
2291 $(
2292 let val: RethRpcModule = $s.parse().unwrap();
2293 assert_eq!(val, $v);
2294 assert_eq!(val.to_string().as_str(), $s);
2295 )*
2296 };
2297 }
2298 assert_rpc_module!
2299 (
2300 "admin" => RethRpcModule::Admin,
2301 "debug" => RethRpcModule::Debug,
2302 "eth" => RethRpcModule::Eth,
2303 "net" => RethRpcModule::Net,
2304 "trace" => RethRpcModule::Trace,
2305 "web3" => RethRpcModule::Web3,
2306 "rpc" => RethRpcModule::Rpc,
2307 "ots" => RethRpcModule::Ots,
2308 "reth" => RethRpcModule::Reth,
2309 );
2310 }
2311
2312 #[test]
2313 fn test_default_selection() {
2314 let selection = RpcModuleSelection::Standard.to_selection();
2315 assert_eq!(selection, [RethRpcModule::Eth, RethRpcModule::Net, RethRpcModule::Web3].into())
2316 }
2317
2318 #[test]
2319 fn test_create_rpc_module_config() {
2320 let selection = vec!["eth", "admin"];
2321 let config = RpcModuleSelection::try_from_selection(selection).unwrap();
2322 assert_eq!(
2323 config,
2324 RpcModuleSelection::Selection([RethRpcModule::Eth, RethRpcModule::Admin].into())
2325 );
2326 }
2327
2328 #[test]
2329 fn test_configure_transport_config() {
2330 let config = TransportRpcModuleConfig::default()
2331 .with_http([RethRpcModule::Eth, RethRpcModule::Admin]);
2332 assert_eq!(
2333 config,
2334 TransportRpcModuleConfig {
2335 http: Some(RpcModuleSelection::Selection(
2336 [RethRpcModule::Eth, RethRpcModule::Admin].into()
2337 )),
2338 ws: None,
2339 ipc: None,
2340 config: None,
2341 }
2342 )
2343 }
2344
2345 #[test]
2346 fn test_configure_transport_config_none() {
2347 let config = TransportRpcModuleConfig::default().with_http(Vec::<RethRpcModule>::new());
2348 assert_eq!(
2349 config,
2350 TransportRpcModuleConfig {
2351 http: Some(RpcModuleSelection::Selection(Default::default())),
2352 ws: None,
2353 ipc: None,
2354 config: None,
2355 }
2356 )
2357 }
2358
2359 fn create_test_module() -> RpcModule<()> {
2360 let mut module = RpcModule::new(());
2361 module.register_method("anything", |_, _, _| "succeed").unwrap();
2362 module
2363 }
2364
2365 #[test]
2366 fn test_remove_http_method() {
2367 let mut modules =
2368 TransportRpcModules { http: Some(create_test_module()), ..Default::default() };
2369 assert!(modules.remove_http_method("anything"));
2371
2372 assert!(!modules.remove_http_method("non_existent_method"));
2374
2375 assert!(modules.http.as_ref().unwrap().method("anything").is_none());
2377 }
2378
2379 #[test]
2380 fn test_remove_ws_method() {
2381 let mut modules =
2382 TransportRpcModules { ws: Some(create_test_module()), ..Default::default() };
2383
2384 assert!(modules.remove_ws_method("anything"));
2386
2387 assert!(!modules.remove_ws_method("non_existent_method"));
2389
2390 assert!(modules.ws.as_ref().unwrap().method("anything").is_none());
2392 }
2393
2394 #[test]
2395 fn test_remove_ipc_method() {
2396 let mut modules =
2397 TransportRpcModules { ipc: Some(create_test_module()), ..Default::default() };
2398
2399 assert!(modules.remove_ipc_method("anything"));
2401
2402 assert!(!modules.remove_ipc_method("non_existent_method"));
2404
2405 assert!(modules.ipc.as_ref().unwrap().method("anything").is_none());
2407 }
2408
2409 #[test]
2410 fn test_remove_method_from_configured() {
2411 let mut modules = TransportRpcModules {
2412 http: Some(create_test_module()),
2413 ws: Some(create_test_module()),
2414 ipc: Some(create_test_module()),
2415 ..Default::default()
2416 };
2417
2418 assert!(modules.remove_method_from_configured("anything"));
2420
2421 assert!(!modules.remove_method_from_configured("anything"));
2423
2424 assert!(!modules.remove_method_from_configured("non_existent_method"));
2426
2427 assert!(modules.http.as_ref().unwrap().method("anything").is_none());
2429 assert!(modules.ws.as_ref().unwrap().method("anything").is_none());
2430 assert!(modules.ipc.as_ref().unwrap().method("anything").is_none());
2431 }
2432
2433 #[test]
2434 fn test_transport_rpc_module_rename() {
2435 let mut modules = TransportRpcModules {
2436 http: Some(create_test_module()),
2437 ws: Some(create_test_module()),
2438 ipc: Some(create_test_module()),
2439 ..Default::default()
2440 };
2441
2442 assert!(modules.http.as_ref().unwrap().method("anything").is_some());
2444 assert!(modules.ws.as_ref().unwrap().method("anything").is_some());
2445 assert!(modules.ipc.as_ref().unwrap().method("anything").is_some());
2446
2447 assert!(modules.http.as_ref().unwrap().method("something").is_none());
2449 assert!(modules.ws.as_ref().unwrap().method("something").is_none());
2450 assert!(modules.ipc.as_ref().unwrap().method("something").is_none());
2451
2452 let mut other_module = RpcModule::new(());
2454 other_module.register_method("something", |_, _, _| "fails").unwrap();
2455
2456 modules.rename("anything", other_module).expect("rename failed");
2458
2459 assert!(modules.http.as_ref().unwrap().method("anything").is_none());
2461 assert!(modules.ws.as_ref().unwrap().method("anything").is_none());
2462 assert!(modules.ipc.as_ref().unwrap().method("anything").is_none());
2463
2464 assert!(modules.http.as_ref().unwrap().method("something").is_some());
2466 assert!(modules.ws.as_ref().unwrap().method("something").is_some());
2467 assert!(modules.ipc.as_ref().unwrap().method("something").is_some());
2468 }
2469
2470 #[test]
2471 fn test_replace_http_method() {
2472 let mut modules =
2473 TransportRpcModules { http: Some(create_test_module()), ..Default::default() };
2474
2475 let mut other_module = RpcModule::new(());
2476 other_module.register_method("something", |_, _, _| "fails").unwrap();
2477
2478 assert!(modules.replace_http(other_module.clone()).unwrap());
2479
2480 assert!(modules.http.as_ref().unwrap().method("something").is_some());
2481
2482 other_module.register_method("anything", |_, _, _| "fails").unwrap();
2483 assert!(modules.replace_http(other_module.clone()).unwrap());
2484
2485 assert!(modules.http.as_ref().unwrap().method("anything").is_some());
2486 }
2487 #[test]
2488 fn test_replace_ipc_method() {
2489 let mut modules =
2490 TransportRpcModules { ipc: Some(create_test_module()), ..Default::default() };
2491
2492 let mut other_module = RpcModule::new(());
2493 other_module.register_method("something", |_, _, _| "fails").unwrap();
2494
2495 assert!(modules.replace_ipc(other_module.clone()).unwrap());
2496
2497 assert!(modules.ipc.as_ref().unwrap().method("something").is_some());
2498
2499 other_module.register_method("anything", |_, _, _| "fails").unwrap();
2500 assert!(modules.replace_ipc(other_module.clone()).unwrap());
2501
2502 assert!(modules.ipc.as_ref().unwrap().method("anything").is_some());
2503 }
2504 #[test]
2505 fn test_replace_ws_method() {
2506 let mut modules =
2507 TransportRpcModules { ws: Some(create_test_module()), ..Default::default() };
2508
2509 let mut other_module = RpcModule::new(());
2510 other_module.register_method("something", |_, _, _| "fails").unwrap();
2511
2512 assert!(modules.replace_ws(other_module.clone()).unwrap());
2513
2514 assert!(modules.ws.as_ref().unwrap().method("something").is_some());
2515
2516 other_module.register_method("anything", |_, _, _| "fails").unwrap();
2517 assert!(modules.replace_ws(other_module.clone()).unwrap());
2518
2519 assert!(modules.ws.as_ref().unwrap().method("anything").is_some());
2520 }
2521
2522 #[test]
2523 fn test_replace_configured() {
2524 let mut modules = TransportRpcModules {
2525 http: Some(create_test_module()),
2526 ws: Some(create_test_module()),
2527 ipc: Some(create_test_module()),
2528 ..Default::default()
2529 };
2530 let mut other_module = RpcModule::new(());
2531 other_module.register_method("something", |_, _, _| "fails").unwrap();
2532
2533 assert!(modules.replace_configured(other_module).unwrap());
2534
2535 assert!(modules.http.as_ref().unwrap().method("something").is_some());
2537 assert!(modules.ipc.as_ref().unwrap().method("something").is_some());
2538 assert!(modules.ws.as_ref().unwrap().method("something").is_some());
2539
2540 assert!(modules.http.as_ref().unwrap().method("anything").is_some());
2541 assert!(modules.ipc.as_ref().unwrap().method("anything").is_some());
2542 assert!(modules.ws.as_ref().unwrap().method("anything").is_some());
2543 }
2544
2545 #[test]
2546 fn test_add_or_replace_if_module_configured() {
2547 let config = TransportRpcModuleConfig::default()
2549 .with_http([RethRpcModule::Eth])
2550 .with_ws([RethRpcModule::Eth]);
2551
2552 let mut http_module = RpcModule::new(());
2554 http_module.register_method("eth_existing", |_, _, _| "original").unwrap();
2555
2556 let mut ws_module = RpcModule::new(());
2558 ws_module.register_method("eth_existing", |_, _, _| "original").unwrap();
2559
2560 let ipc_module = RpcModule::new(());
2562
2563 let mut modules = TransportRpcModules {
2565 config,
2566 http: Some(http_module),
2567 ws: Some(ws_module),
2568 ipc: Some(ipc_module),
2569 };
2570
2571 let mut new_module = RpcModule::new(());
2573 new_module.register_method("eth_existing", |_, _, _| "replaced").unwrap(); new_module.register_method("eth_new", |_, _, _| "added").unwrap(); let new_methods: Methods = new_module.into();
2576
2577 let result = modules.add_or_replace_if_module_configured(RethRpcModule::Eth, new_methods);
2579 assert!(result.is_ok(), "Function should succeed");
2580
2581 let http = modules.http.as_ref().unwrap();
2583 assert!(http.method("eth_existing").is_some());
2584 assert!(http.method("eth_new").is_some());
2585
2586 let ws = modules.ws.as_ref().unwrap();
2588 assert!(ws.method("eth_existing").is_some());
2589 assert!(ws.method("eth_new").is_some());
2590
2591 let ipc = modules.ipc.as_ref().unwrap();
2593 assert!(ipc.method("eth_existing").is_none());
2594 assert!(ipc.method("eth_new").is_none());
2595 }
2596}