1#![doc(
15 html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
16 html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
17 issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
18)]
19#![cfg_attr(not(test), warn(unused_crate_dependencies))]
20#![cfg_attr(docsrs, feature(doc_cfg))]
21
22use crate::{auth::AuthRpcModule, error::WsHttpSamePortError, metrics::RpcRequestMetrics};
23use alloy_network::{Ethereum, IntoWallet};
24use alloy_provider::{fillers::RecommendedFillers, Provider, ProviderBuilder};
25use core::marker::PhantomData;
26use error::{ConflictingModules, RpcError, ServerKind};
27use http::{header::AUTHORIZATION, HeaderMap};
28use jsonrpsee::{
29 core::RegisterMethodError,
30 server::{middleware::rpc::RpcServiceBuilder, AlreadyStoppedError, IdProvider, ServerHandle},
31 Methods, RpcModule,
32};
33use reth_chainspec::{ChainSpecProvider, EthereumHardforks};
34use reth_consensus::FullConsensus;
35use reth_engine_primitives::ConsensusEngineEvent;
36use reth_evm::ConfigureEvm;
37use reth_network_api::{noop::NoopNetwork, NetworkInfo, Peers};
38use reth_primitives_traits::{NodePrimitives, TxTy};
39use reth_rpc::{
40 AdminApi, DebugApi, EngineEthApi, EthApi, EthApiBuilder, EthBundle, MinerApi, NetApi,
41 OtterscanApi, RPCApi, RethApi, TraceApi, TxPoolApi, Web3Api,
42};
43use reth_rpc_api::servers::*;
44use reth_rpc_eth_api::{
45 helpers::{
46 pending_block::PendingEnvBuilder, Call, EthApiSpec, EthTransactions, LoadPendingBlock,
47 TraceExt,
48 },
49 node::RpcNodeCoreAdapter,
50 EthApiServer, EthApiTypes, FullEthApiServer, FullEthApiTypes, RpcBlock, RpcConvert,
51 RpcConverter, RpcHeader, RpcNodeCore, RpcReceipt, RpcTransaction, RpcTxReq,
52};
53use reth_rpc_eth_types::{receipt::EthReceiptConverter, EthConfig, EthSubscriptionIdProvider};
54use reth_rpc_layer::{AuthLayer, Claims, CompressionLayer, JwtAuthValidator, JwtSecret};
55pub use reth_rpc_server_types::RethRpcModule;
56use reth_storage_api::{
57 AccountReader, BlockReader, ChangeSetReader, FullRpcProvider, NodePrimitivesProvider,
58 StateProviderFactory,
59};
60use reth_tasks::{pool::BlockingTaskGuard, Runtime};
61use reth_tokio_util::EventSender;
62use reth_transaction_pool::{noop::NoopTransactionPool, TransactionPool};
63use serde::{Deserialize, Serialize};
64use std::{
65 collections::HashMap,
66 fmt::Debug,
67 net::{Ipv4Addr, SocketAddr, SocketAddrV4},
68 time::{Duration, SystemTime, UNIX_EPOCH},
69};
70use tower_http::cors::CorsLayer;
71
72pub use cors::CorsDomainError;
73
74pub use jsonrpsee::server::ServerBuilder;
76use jsonrpsee::server::ServerConfigBuilder;
77pub use reth_ipc::server::{
78 Builder as IpcServerBuilder, RpcServiceBuilder as IpcRpcServiceBuilder,
79};
80pub use reth_rpc_server_types::{constants, RpcModuleSelection};
81pub use tower::layer::util::{Identity, Stack};
82
83pub mod auth;
85
86pub mod config;
88
89pub mod middleware;
91
92mod cors;
94
95pub mod error;
97
98pub mod eth;
100pub use eth::EthHandlers;
101
102mod metrics;
104use crate::middleware::RethRpcMiddleware;
105pub use metrics::{MeteredBatchRequestsFuture, MeteredRequestFuture, RpcRequestMetricsService};
106use reth_chain_state::{
107 CanonStateSubscriptions, ForkChoiceSubscriptions, PersistedBlockSubscriptions,
108};
109use reth_rpc::eth::sim_bundle::EthSimBundle;
110
111pub mod rate_limiter;
113
114#[derive(Debug, Clone)]
118pub struct RpcModuleBuilder<N, Provider, Pool, Network, EvmConfig, Consensus> {
119 provider: Provider,
121 pool: Pool,
123 network: Network,
125 executor: Option<Runtime>,
127 evm_config: EvmConfig,
129 consensus: Consensus,
131 _primitives: PhantomData<N>,
133}
134
135impl<N, Provider, Pool, Network, EvmConfig, Consensus>
138 RpcModuleBuilder<N, Provider, Pool, Network, EvmConfig, Consensus>
139{
140 pub const fn new(
142 provider: Provider,
143 pool: Pool,
144 network: Network,
145 executor: Runtime,
146 evm_config: EvmConfig,
147 consensus: Consensus,
148 ) -> Self {
149 Self {
150 provider,
151 pool,
152 network,
153 executor: Some(executor),
154 evm_config,
155 consensus,
156 _primitives: PhantomData,
157 }
158 }
159
160 pub fn with_provider<P>(
162 self,
163 provider: P,
164 ) -> RpcModuleBuilder<N, P, Pool, Network, EvmConfig, Consensus> {
165 let Self { pool, network, executor, evm_config, consensus, _primitives, .. } = self;
166 RpcModuleBuilder { provider, network, pool, executor, evm_config, consensus, _primitives }
167 }
168
169 pub fn with_pool<P>(
171 self,
172 pool: P,
173 ) -> RpcModuleBuilder<N, Provider, P, Network, EvmConfig, Consensus> {
174 let Self { provider, network, executor, evm_config, consensus, _primitives, .. } = self;
175 RpcModuleBuilder { provider, network, pool, executor, evm_config, consensus, _primitives }
176 }
177
178 pub fn with_noop_pool(
184 self,
185 ) -> RpcModuleBuilder<N, Provider, NoopTransactionPool, Network, EvmConfig, Consensus> {
186 let Self { provider, executor, network, evm_config, consensus, _primitives, .. } = self;
187 RpcModuleBuilder {
188 provider,
189 executor,
190 network,
191 evm_config,
192 pool: NoopTransactionPool::default(),
193 consensus,
194 _primitives,
195 }
196 }
197
198 pub fn with_network<Net>(
200 self,
201 network: Net,
202 ) -> RpcModuleBuilder<N, Provider, Pool, Net, EvmConfig, Consensus> {
203 let Self { provider, pool, executor, evm_config, consensus, _primitives, .. } = self;
204 RpcModuleBuilder { provider, network, pool, executor, evm_config, consensus, _primitives }
205 }
206
207 pub fn with_noop_network(
213 self,
214 ) -> RpcModuleBuilder<N, Provider, Pool, NoopNetwork, EvmConfig, Consensus> {
215 let Self { provider, pool, executor, evm_config, consensus, _primitives, .. } = self;
216 RpcModuleBuilder {
217 provider,
218 pool,
219 executor,
220 network: NoopNetwork::default(),
221 evm_config,
222 consensus,
223 _primitives,
224 }
225 }
226
227 pub fn with_executor(self, executor: Runtime) -> Self {
229 let Self { pool, network, provider, evm_config, consensus, _primitives, .. } = self;
230 Self {
231 provider,
232 network,
233 pool,
234 executor: Some(executor),
235 evm_config,
236 consensus,
237 _primitives,
238 }
239 }
240
241 pub fn with_evm_config<E>(
243 self,
244 evm_config: E,
245 ) -> RpcModuleBuilder<N, Provider, Pool, Network, E, Consensus> {
246 let Self { provider, pool, executor, network, consensus, _primitives, .. } = self;
247 RpcModuleBuilder { provider, network, pool, executor, evm_config, consensus, _primitives }
248 }
249
250 pub fn with_consensus<C>(
252 self,
253 consensus: C,
254 ) -> RpcModuleBuilder<N, Provider, Pool, Network, EvmConfig, C> {
255 let Self { provider, network, pool, executor, evm_config, _primitives, .. } = self;
256 RpcModuleBuilder { provider, network, pool, executor, evm_config, consensus, _primitives }
257 }
258
259 #[expect(clippy::type_complexity)]
261 pub fn eth_api_builder<ChainSpec>(
262 &self,
263 ) -> EthApiBuilder<
264 RpcNodeCoreAdapter<Provider, Pool, Network, EvmConfig>,
265 RpcConverter<Ethereum, EvmConfig, EthReceiptConverter<ChainSpec>>,
266 >
267 where
268 Provider: Clone,
269 Pool: Clone,
270 Network: Clone,
271 EvmConfig: Clone,
272 RpcNodeCoreAdapter<Provider, Pool, Network, EvmConfig>:
273 RpcNodeCore<Provider: ChainSpecProvider<ChainSpec = ChainSpec>, Evm = EvmConfig>,
274 {
275 EthApiBuilder::new(
276 self.provider.clone(),
277 self.pool.clone(),
278 self.network.clone(),
279 self.evm_config.clone(),
280 )
281 }
282
283 #[expect(clippy::type_complexity)]
289 pub fn bootstrap_eth_api<ChainSpec>(
290 &self,
291 ) -> EthApi<
292 RpcNodeCoreAdapter<Provider, Pool, Network, EvmConfig>,
293 RpcConverter<Ethereum, EvmConfig, EthReceiptConverter<ChainSpec>>,
294 >
295 where
296 Provider: Clone,
297 Pool: Clone,
298 Network: Clone,
299 EvmConfig: ConfigureEvm + Clone,
300 RpcNodeCoreAdapter<Provider, Pool, Network, EvmConfig>:
301 RpcNodeCore<Provider: ChainSpecProvider<ChainSpec = ChainSpec>, Evm = EvmConfig>,
302 RpcConverter<Ethereum, EvmConfig, EthReceiptConverter<ChainSpec>>: RpcConvert,
303 (): PendingEnvBuilder<EvmConfig>,
304 {
305 self.eth_api_builder().build()
306 }
307}
308
309impl<N, Provider, Pool, Network, EvmConfig, Consensus>
310 RpcModuleBuilder<N, Provider, Pool, Network, EvmConfig, Consensus>
311where
312 N: NodePrimitives,
313 Provider: FullRpcProvider<Block = N::Block, Receipt = N::Receipt, Header = N::BlockHeader>
314 + CanonStateSubscriptions<Primitives = N>
315 + ForkChoiceSubscriptions<Header = N::BlockHeader>
316 + PersistedBlockSubscriptions
317 + AccountReader
318 + ChangeSetReader,
319 Pool: TransactionPool + Clone + 'static,
320 Network: NetworkInfo + Peers + Clone + 'static,
321 EvmConfig: ConfigureEvm<Primitives = N> + 'static,
322 Consensus: FullConsensus<N> + Clone + 'static,
323{
324 pub fn build_with_auth_server<EthApi>(
331 self,
332 module_config: TransportRpcModuleConfig,
333 engine: impl IntoEngineApiRpcModule,
334 eth: EthApi,
335 engine_events: EventSender<ConsensusEngineEvent<N>>,
336 ) -> (
337 TransportRpcModules,
338 AuthRpcModule,
339 RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>,
340 )
341 where
342 EthApi: FullEthApiServer<Provider = Provider, Pool = Pool>,
343 {
344 let config = module_config.config.clone().unwrap_or_default();
345
346 let mut registry = self.into_registry(config, eth, engine_events);
347 let modules = registry.create_transport_rpc_modules(module_config);
348 let auth_module = registry.create_auth_module(engine);
349
350 (modules, auth_module, registry)
351 }
352
353 pub fn into_registry<EthApi>(
358 self,
359 config: RpcModuleConfig,
360 eth: EthApi,
361 engine_events: EventSender<ConsensusEngineEvent<N>>,
362 ) -> RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
363 where
364 EthApi: FullEthApiServer<Provider = Provider, Pool = Pool>,
365 {
366 let Self { provider, pool, network, executor, consensus, evm_config, .. } = self;
367 let executor =
368 executor.expect("RpcModuleBuilder requires a Runtime to be set via `with_executor`");
369 RpcRegistryInner::new(
370 provider,
371 pool,
372 network,
373 executor,
374 consensus,
375 config,
376 evm_config,
377 eth,
378 engine_events,
379 )
380 }
381
382 pub fn build<EthApi>(
385 self,
386 module_config: TransportRpcModuleConfig,
387 eth: EthApi,
388 engine_events: EventSender<ConsensusEngineEvent<N>>,
389 ) -> TransportRpcModules<()>
390 where
391 EthApi: FullEthApiServer<Provider = Provider, Pool = Pool>,
392 {
393 let mut modules = TransportRpcModules::default();
394
395 if !module_config.is_empty() {
396 let TransportRpcModuleConfig { http, ws, ipc, config } = module_config.clone();
397
398 let mut registry = self.into_registry(config.unwrap_or_default(), eth, engine_events);
399
400 modules.config = module_config;
401 modules.http = registry.maybe_module(http.as_ref());
402 modules.ws = registry.maybe_module(ws.as_ref());
403 modules.ipc = registry.maybe_module(ipc.as_ref());
404 }
405
406 modules
407 }
408}
409
410impl<N: NodePrimitives> Default for RpcModuleBuilder<N, (), (), (), (), ()> {
411 fn default() -> Self {
412 Self {
413 provider: (),
414 pool: (),
415 network: (),
416 executor: None,
417 evm_config: (),
418 consensus: (),
419 _primitives: PhantomData,
420 }
421 }
422}
423
424#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize)]
426pub struct RpcModuleConfig {
427 eth: EthConfig,
429}
430
431impl RpcModuleConfig {
434 pub fn builder() -> RpcModuleConfigBuilder {
436 RpcModuleConfigBuilder::default()
437 }
438
439 pub const fn new(eth: EthConfig) -> Self {
441 Self { eth }
442 }
443
444 pub const fn eth(&self) -> &EthConfig {
446 &self.eth
447 }
448
449 pub const fn eth_mut(&mut self) -> &mut EthConfig {
451 &mut self.eth
452 }
453}
454
455#[derive(Clone, Debug, Default)]
457pub struct RpcModuleConfigBuilder {
458 eth: Option<EthConfig>,
459}
460
461impl RpcModuleConfigBuilder {
464 pub fn eth(mut self, eth: EthConfig) -> Self {
466 self.eth = Some(eth);
467 self
468 }
469
470 pub fn build(self) -> RpcModuleConfig {
472 let Self { eth } = self;
473 RpcModuleConfig { eth: eth.unwrap_or_default() }
474 }
475
476 pub const fn get_eth(&self) -> Option<&EthConfig> {
478 self.eth.as_ref()
479 }
480
481 pub const fn eth_mut(&mut self) -> &mut Option<EthConfig> {
483 &mut self.eth
484 }
485
486 pub fn eth_mut_or_default(&mut self) -> &mut EthConfig {
488 self.eth.get_or_insert_with(EthConfig::default)
489 }
490}
491
492#[derive(Debug)]
494pub struct RpcRegistryInner<Provider, Pool, Network, EthApi: EthApiTypes, EvmConfig, Consensus> {
495 provider: Provider,
496 pool: Pool,
497 network: Network,
498 executor: Runtime,
499 evm_config: EvmConfig,
500 consensus: Consensus,
501 eth: EthHandlers<EthApi>,
503 blocking_pool_guard: BlockingTaskGuard,
505 modules: HashMap<RethRpcModule, Methods>,
507 eth_config: EthConfig,
509 engine_events:
511 EventSender<ConsensusEngineEvent<<EthApi::RpcConvert as RpcConvert>::Primitives>>,
512}
513
514impl<N, Provider, Pool, Network, EthApi, EvmConfig, Consensus>
517 RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
518where
519 N: NodePrimitives,
520 Provider: StateProviderFactory
521 + CanonStateSubscriptions<Primitives = N>
522 + BlockReader<Block = N::Block, Receipt = N::Receipt>
523 + Clone
524 + Unpin
525 + 'static,
526 Pool: Send + Sync + Clone + 'static,
527 Network: Clone + 'static,
528 EthApi: FullEthApiTypes + 'static,
529 EvmConfig: ConfigureEvm<Primitives = N>,
530{
531 #[expect(clippy::too_many_arguments)]
533 pub fn new(
534 provider: Provider,
535 pool: Pool,
536 network: Network,
537 executor: Runtime,
538 consensus: Consensus,
539 config: RpcModuleConfig,
540 evm_config: EvmConfig,
541 eth_api: EthApi,
542 engine_events: EventSender<
543 ConsensusEngineEvent<<EthApi::Provider as NodePrimitivesProvider>::Primitives>,
544 >,
545 ) -> Self
546 where
547 EvmConfig: ConfigureEvm<Primitives = N>,
548 {
549 let blocking_pool_guard = BlockingTaskGuard::new(config.eth.max_tracing_requests);
550
551 let eth = EthHandlers::bootstrap(config.eth.clone(), executor.clone(), eth_api);
552
553 Self {
554 provider,
555 pool,
556 network,
557 eth,
558 executor,
559 consensus,
560 modules: Default::default(),
561 blocking_pool_guard,
562 eth_config: config.eth,
563 evm_config,
564 engine_events,
565 }
566 }
567}
568
569impl<Provider, Pool, Network, EthApi, Evm, Consensus>
570 RpcRegistryInner<Provider, Pool, Network, EthApi, Evm, Consensus>
571where
572 EthApi: EthApiTypes,
573{
574 pub const fn eth_api(&self) -> &EthApi {
576 &self.eth.api
577 }
578
579 pub const fn eth_handlers(&self) -> &EthHandlers<EthApi> {
581 &self.eth
582 }
583
584 pub const fn pool(&self) -> &Pool {
586 &self.pool
587 }
588
589 pub const fn tasks(&self) -> &Runtime {
591 &self.executor
592 }
593
594 pub const fn provider(&self) -> &Provider {
596 &self.provider
597 }
598
599 pub const fn evm_config(&self) -> &Evm {
601 &self.evm_config
602 }
603
604 pub fn methods(&self) -> Vec<Methods> {
606 self.modules.values().cloned().collect()
607 }
608
609 pub fn module(&self) -> RpcModule<()> {
611 let mut module = RpcModule::new(());
612 for methods in self.modules.values().cloned() {
613 module.merge(methods).expect("No conflicts");
614 }
615 module
616 }
617}
618
619impl<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
620 RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
621where
622 Network: NetworkInfo + Clone + 'static,
623 EthApi: EthApiTypes,
624 Provider: BlockReader + ChainSpecProvider<ChainSpec: EthereumHardforks>,
625 EvmConfig: ConfigureEvm,
626{
627 pub fn admin_api(&self) -> AdminApi<Network, Provider::ChainSpec, Pool>
629 where
630 Network: Peers,
631 Pool: TransactionPool + Clone + 'static,
632 {
633 AdminApi::new(self.network.clone(), self.provider.chain_spec(), self.pool.clone())
634 }
635
636 pub fn web3_api(&self) -> Web3Api<Network> {
638 Web3Api::new(self.network.clone())
639 }
640
641 pub fn register_admin(&mut self) -> &mut Self
643 where
644 Network: Peers,
645 Pool: TransactionPool + Clone + 'static,
646 {
647 let adminapi = self.admin_api();
648 self.modules.insert(RethRpcModule::Admin, adminapi.into_rpc().into());
649 self
650 }
651
652 pub fn register_web3(&mut self) -> &mut Self {
654 let web3api = self.web3_api();
655 self.modules.insert(RethRpcModule::Web3, web3api.into_rpc().into());
656 self
657 }
658}
659
660impl<N, Provider, Pool, Network, EthApi, EvmConfig, Consensus>
661 RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
662where
663 N: NodePrimitives,
664 Provider: FullRpcProvider<
665 Header = N::BlockHeader,
666 Block = N::Block,
667 Receipt = N::Receipt,
668 Transaction = N::SignedTx,
669 > + AccountReader
670 + ChangeSetReader
671 + CanonStateSubscriptions<Primitives = N>
672 + ForkChoiceSubscriptions<Header = N::BlockHeader>
673 + PersistedBlockSubscriptions,
674 Network: NetworkInfo + Peers + Clone + 'static,
675 EthApi: EthApiServer<
676 RpcTxReq<EthApi::NetworkTypes>,
677 RpcTransaction<EthApi::NetworkTypes>,
678 RpcBlock<EthApi::NetworkTypes>,
679 RpcReceipt<EthApi::NetworkTypes>,
680 RpcHeader<EthApi::NetworkTypes>,
681 TxTy<N>,
682 > + EthApiTypes,
683 EvmConfig: ConfigureEvm<Primitives = N> + 'static,
684{
685 pub fn register_eth(&mut self) -> &mut Self {
691 let eth_api = self.eth_api().clone();
692 self.modules.insert(RethRpcModule::Eth, eth_api.into_rpc().into());
693 self
694 }
695
696 pub fn register_ots(&mut self) -> &mut Self
702 where
703 EthApi: TraceExt + EthTransactions<Primitives = N>,
704 {
705 let otterscan_api = self.otterscan_api();
706 self.modules.insert(RethRpcModule::Ots, otterscan_api.into_rpc().into());
707 self
708 }
709
710 pub fn register_debug(&mut self) -> &mut Self
716 where
717 EthApi: EthTransactions + TraceExt,
718 {
719 let debug_api = self.debug_api();
720 self.modules.insert(RethRpcModule::Debug, debug_api.into_rpc().into());
721 self
722 }
723
724 pub fn register_trace(&mut self) -> &mut Self
730 where
731 EthApi: TraceExt,
732 {
733 let trace_api = self.trace_api();
734 self.modules.insert(RethRpcModule::Trace, trace_api.into_rpc().into());
735 self
736 }
737
738 pub fn register_net(&mut self) -> &mut Self
746 where
747 EthApi: EthApiSpec + 'static,
748 {
749 let netapi = self.net_api();
750 self.modules.insert(RethRpcModule::Net, netapi.into_rpc().into());
751 self
752 }
753
754 pub fn register_reth(&mut self) -> &mut Self {
762 let rethapi = self.reth_api();
763 self.modules.insert(RethRpcModule::Reth, rethapi.into_rpc().into());
764 self
765 }
766
767 pub fn otterscan_api(&self) -> OtterscanApi<EthApi> {
773 let eth_api = self.eth_api().clone();
774 OtterscanApi::new(eth_api)
775 }
776}
777
778impl<N, Provider, Pool, Network, EthApi, EvmConfig, Consensus>
779 RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
780where
781 N: NodePrimitives,
782 Provider: FullRpcProvider<
783 Block = N::Block,
784 Header = N::BlockHeader,
785 Transaction = N::SignedTx,
786 Receipt = N::Receipt,
787 > + AccountReader
788 + ChangeSetReader,
789 Network: NetworkInfo + Peers + Clone + 'static,
790 EthApi: EthApiTypes,
791 EvmConfig: ConfigureEvm<Primitives = N>,
792{
793 pub fn trace_api(&self) -> TraceApi<EthApi> {
799 TraceApi::new(
800 self.eth_api().clone(),
801 self.blocking_pool_guard.clone(),
802 self.eth_config.clone(),
803 )
804 }
805
806 pub fn bundle_api(&self) -> EthBundle<EthApi>
812 where
813 EthApi: EthTransactions + LoadPendingBlock + Call,
814 {
815 let eth_api = self.eth_api().clone();
816 EthBundle::new(eth_api, self.blocking_pool_guard.clone())
817 }
818
819 pub fn debug_api(&self) -> DebugApi<EthApi>
825 where
826 EthApi: FullEthApiTypes,
827 {
828 DebugApi::new(
829 self.eth_api().clone(),
830 self.blocking_pool_guard.clone(),
831 self.tasks(),
832 self.engine_events.new_listener(),
833 )
834 }
835
836 pub fn net_api(&self) -> NetApi<Network, EthApi>
842 where
843 EthApi: EthApiSpec + 'static,
844 {
845 let eth_api = self.eth_api().clone();
846 NetApi::new(self.network.clone(), eth_api)
847 }
848
849 pub fn reth_api(&self) -> RethApi<Provider> {
851 RethApi::new(self.provider.clone(), self.executor.clone())
852 }
853}
854
855impl<N, Provider, Pool, Network, EthApi, EvmConfig, Consensus>
856 RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
857where
858 N: NodePrimitives,
859 Provider: FullRpcProvider<Block = N::Block>
860 + CanonStateSubscriptions<Primitives = N>
861 + ForkChoiceSubscriptions<Header = N::BlockHeader>
862 + PersistedBlockSubscriptions
863 + AccountReader
864 + ChangeSetReader,
865 Pool: TransactionPool + Clone + 'static,
866 Network: NetworkInfo + Peers + Clone + 'static,
867 EthApi: FullEthApiServer,
868 EvmConfig: ConfigureEvm<Primitives = N> + 'static,
869 Consensus: FullConsensus<N> + Clone + 'static,
870{
871 pub fn create_auth_module(&self, engine_api: impl IntoEngineApiRpcModule) -> AuthRpcModule {
877 let mut module = engine_api.into_rpc_module();
878
879 let eth_handlers = self.eth_handlers();
881 let engine_eth = EngineEthApi::new(eth_handlers.api.clone(), eth_handlers.filter.clone());
882
883 module.merge(engine_eth.into_rpc()).expect("No conflicting methods");
884
885 AuthRpcModule { inner: module }
886 }
887
888 fn maybe_module(&mut self, config: Option<&RpcModuleSelection>) -> Option<RpcModule<()>> {
890 config.map(|config| self.module_for(config))
891 }
892
893 pub fn create_transport_rpc_modules(
897 &mut self,
898 config: TransportRpcModuleConfig,
899 ) -> TransportRpcModules<()> {
900 let mut modules = TransportRpcModules::default();
901 let http = self.maybe_module(config.http.as_ref());
902 let ws = self.maybe_module(config.ws.as_ref());
903 let ipc = self.maybe_module(config.ipc.as_ref());
904
905 modules.config = config;
906 modules.http = http;
907 modules.ws = ws;
908 modules.ipc = ipc;
909 modules
910 }
911
912 pub fn module_for(&mut self, config: &RpcModuleSelection) -> RpcModule<()> {
915 let mut module = RpcModule::new(());
916 let all_methods = self.reth_methods(config.iter_selection());
917 for methods in all_methods {
918 module.merge(methods).expect("No conflicts");
919 }
920 module
921 }
922
923 pub fn reth_methods(
932 &mut self,
933 namespaces: impl Iterator<Item = RethRpcModule>,
934 ) -> Vec<Methods> {
935 let EthHandlers { api: eth_api, filter: eth_filter, pubsub: eth_pubsub, .. } =
936 self.eth_handlers().clone();
937
938 let namespaces: Vec<_> = namespaces.collect();
940 namespaces
941 .iter()
942 .map(|namespace| {
943 self.modules
944 .entry(namespace.clone())
945 .or_insert_with(|| match namespace.clone() {
946 RethRpcModule::Admin => AdminApi::new(
947 self.network.clone(),
948 self.provider.chain_spec(),
949 self.pool.clone(),
950 )
951 .into_rpc()
952 .into(),
953 RethRpcModule::Debug => DebugApi::new(
954 eth_api.clone(),
955 self.blocking_pool_guard.clone(),
956 &self.executor,
957 self.engine_events.new_listener(),
958 )
959 .into_rpc()
960 .into(),
961 RethRpcModule::Eth => {
962 let mut module = eth_api.clone().into_rpc();
964 module.merge(eth_filter.clone().into_rpc()).expect("No conflicts");
965 module.merge(eth_pubsub.clone().into_rpc()).expect("No conflicts");
966 module
967 .merge(
968 EthBundle::new(
969 eth_api.clone(),
970 self.blocking_pool_guard.clone(),
971 )
972 .into_rpc(),
973 )
974 .expect("No conflicts");
975
976 module.into()
977 }
978 RethRpcModule::Net => {
979 NetApi::new(self.network.clone(), eth_api.clone()).into_rpc().into()
980 }
981 RethRpcModule::Trace => TraceApi::new(
982 eth_api.clone(),
983 self.blocking_pool_guard.clone(),
984 self.eth_config.clone(),
985 )
986 .into_rpc()
987 .into(),
988 RethRpcModule::Web3 => Web3Api::new(self.network.clone()).into_rpc().into(),
989 RethRpcModule::Txpool => TxPoolApi::new(
990 self.eth.api.pool().clone(),
991 dyn_clone::clone(self.eth.api.converter()),
992 )
993 .into_rpc()
994 .into(),
995 RethRpcModule::Rpc => RPCApi::new(
996 namespaces
997 .iter()
998 .map(|module| (module.to_string(), "1.0".to_string()))
999 .collect(),
1000 )
1001 .into_rpc()
1002 .into(),
1003 RethRpcModule::Ots => OtterscanApi::new(eth_api.clone()).into_rpc().into(),
1004 RethRpcModule::Reth => {
1005 RethApi::new(self.provider.clone(), self.executor.clone())
1006 .into_rpc()
1007 .into()
1008 }
1009 RethRpcModule::Miner => MinerApi::default().into_rpc().into(),
1010 RethRpcModule::Mev => {
1011 EthSimBundle::new(eth_api.clone(), self.blocking_pool_guard.clone())
1012 .into_rpc()
1013 .into()
1014 }
1015 RethRpcModule::Flashbots |
1019 RethRpcModule::Testing |
1020 RethRpcModule::Other(_) => Default::default(),
1021 })
1022 .clone()
1023 })
1024 .collect::<Vec<_>>()
1025 }
1026}
1027
1028impl<Provider, Pool, Network, EthApi, EvmConfig, Consensus> Clone
1029 for RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
1030where
1031 EthApi: EthApiTypes,
1032 Provider: Clone,
1033 Pool: Clone,
1034 Network: Clone,
1035 EvmConfig: Clone,
1036 Consensus: Clone,
1037{
1038 fn clone(&self) -> Self {
1039 Self {
1040 provider: self.provider.clone(),
1041 pool: self.pool.clone(),
1042 network: self.network.clone(),
1043 executor: self.executor.clone(),
1044 evm_config: self.evm_config.clone(),
1045 consensus: self.consensus.clone(),
1046 eth: self.eth.clone(),
1047 blocking_pool_guard: self.blocking_pool_guard.clone(),
1048 modules: self.modules.clone(),
1049 eth_config: self.eth_config.clone(),
1050 engine_events: self.engine_events.clone(),
1051 }
1052 }
1053}
1054
1055#[derive(Debug)]
1067pub struct RpcServerConfig<RpcMiddleware = Identity> {
1068 http_server_config: Option<ServerConfigBuilder>,
1070 http_cors_domains: Option<String>,
1072 http_addr: Option<SocketAddr>,
1074 http_disable_compression: bool,
1076 ws_server_config: Option<ServerConfigBuilder>,
1078 ws_cors_domains: Option<String>,
1080 ws_addr: Option<SocketAddr>,
1082 ipc_server_config: Option<IpcServerBuilder<Identity, Identity>>,
1084 ipc_endpoint: Option<String>,
1086 jwt_secret: Option<JwtSecret>,
1088 rpc_middleware: RpcMiddleware,
1090}
1091
1092impl Default for RpcServerConfig<Identity> {
1095 fn default() -> Self {
1097 Self {
1098 http_server_config: None,
1099 http_cors_domains: None,
1100 http_addr: None,
1101 http_disable_compression: false,
1102 ws_server_config: None,
1103 ws_cors_domains: None,
1104 ws_addr: None,
1105 ipc_server_config: None,
1106 ipc_endpoint: None,
1107 jwt_secret: None,
1108 rpc_middleware: Default::default(),
1109 }
1110 }
1111}
1112
1113impl RpcServerConfig {
1114 pub fn http(config: ServerConfigBuilder) -> Self {
1116 Self::default().with_http(config)
1117 }
1118
1119 pub fn ws(config: ServerConfigBuilder) -> Self {
1121 Self::default().with_ws(config)
1122 }
1123
1124 pub fn ipc(config: IpcServerBuilder<Identity, Identity>) -> Self {
1126 Self::default().with_ipc(config)
1127 }
1128
1129 pub fn with_http(mut self, config: ServerConfigBuilder) -> Self {
1134 self.http_server_config =
1135 Some(config.set_id_provider(EthSubscriptionIdProvider::default()));
1136 self
1137 }
1138
1139 pub fn with_ws(mut self, config: ServerConfigBuilder) -> Self {
1144 self.ws_server_config = Some(config.set_id_provider(EthSubscriptionIdProvider::default()));
1145 self
1146 }
1147
1148 pub fn with_ipc(mut self, config: IpcServerBuilder<Identity, Identity>) -> Self {
1153 self.ipc_server_config = Some(config.set_id_provider(EthSubscriptionIdProvider::default()));
1154 self
1155 }
1156}
1157
1158impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
1159 pub fn set_rpc_middleware<T>(self, rpc_middleware: T) -> RpcServerConfig<T> {
1161 RpcServerConfig {
1162 http_server_config: self.http_server_config,
1163 http_cors_domains: self.http_cors_domains,
1164 http_addr: self.http_addr,
1165 http_disable_compression: self.http_disable_compression,
1166 ws_server_config: self.ws_server_config,
1167 ws_cors_domains: self.ws_cors_domains,
1168 ws_addr: self.ws_addr,
1169 ipc_server_config: self.ipc_server_config,
1170 ipc_endpoint: self.ipc_endpoint,
1171 jwt_secret: self.jwt_secret,
1172 rpc_middleware,
1173 }
1174 }
1175
1176 pub fn with_cors(self, cors_domain: Option<String>) -> Self {
1178 self.with_http_cors(cors_domain.clone()).with_ws_cors(cors_domain)
1179 }
1180
1181 pub fn with_ws_cors(mut self, cors_domain: Option<String>) -> Self {
1183 self.ws_cors_domains = cors_domain;
1184 self
1185 }
1186
1187 pub const fn with_http_disable_compression(mut self, http_disable_compression: bool) -> Self {
1189 self.http_disable_compression = http_disable_compression;
1190 self
1191 }
1192
1193 pub fn with_http_cors(mut self, cors_domain: Option<String>) -> Self {
1195 self.http_cors_domains = cors_domain;
1196 self
1197 }
1198
1199 pub const fn with_http_address(mut self, addr: SocketAddr) -> Self {
1204 self.http_addr = Some(addr);
1205 self
1206 }
1207
1208 pub const fn with_ws_address(mut self, addr: SocketAddr) -> Self {
1213 self.ws_addr = Some(addr);
1214 self
1215 }
1216
1217 pub fn with_id_provider<I>(mut self, id_provider: I) -> Self
1221 where
1222 I: IdProvider + Clone + 'static,
1223 {
1224 if let Some(config) = self.http_server_config {
1225 self.http_server_config = Some(config.set_id_provider(id_provider.clone()));
1226 }
1227 if let Some(config) = self.ws_server_config {
1228 self.ws_server_config = Some(config.set_id_provider(id_provider.clone()));
1229 }
1230 if let Some(ipc) = self.ipc_server_config {
1231 self.ipc_server_config = Some(ipc.set_id_provider(id_provider));
1232 }
1233
1234 self
1235 }
1236
1237 pub fn with_ipc_endpoint(mut self, path: impl Into<String>) -> Self {
1241 self.ipc_endpoint = Some(path.into());
1242 self
1243 }
1244
1245 pub const fn with_jwt_secret(mut self, secret: Option<JwtSecret>) -> Self {
1247 self.jwt_secret = secret;
1248 self
1249 }
1250
1251 pub fn with_tokio_runtime(mut self, tokio_runtime: Option<tokio::runtime::Handle>) -> Self {
1253 let Some(tokio_runtime) = tokio_runtime else { return self };
1254 if let Some(http_server_config) = self.http_server_config {
1255 self.http_server_config =
1256 Some(http_server_config.custom_tokio_runtime(tokio_runtime.clone()));
1257 }
1258 if let Some(ws_server_config) = self.ws_server_config {
1259 self.ws_server_config =
1260 Some(ws_server_config.custom_tokio_runtime(tokio_runtime.clone()));
1261 }
1262 if let Some(ipc_server_config) = self.ipc_server_config {
1263 self.ipc_server_config = Some(ipc_server_config.custom_tokio_runtime(tokio_runtime));
1264 }
1265 self
1266 }
1267
1268 pub const fn has_server(&self) -> bool {
1272 self.http_server_config.is_some() ||
1273 self.ws_server_config.is_some() ||
1274 self.ipc_server_config.is_some()
1275 }
1276
1277 pub const fn http_address(&self) -> Option<SocketAddr> {
1279 self.http_addr
1280 }
1281
1282 pub const fn ws_address(&self) -> Option<SocketAddr> {
1284 self.ws_addr
1285 }
1286
1287 pub fn ipc_endpoint(&self) -> Option<String> {
1289 self.ipc_endpoint.clone()
1290 }
1291
1292 fn maybe_cors_layer(cors: Option<String>) -> Result<Option<CorsLayer>, CorsDomainError> {
1294 cors.as_deref().map(cors::create_cors_layer).transpose()
1295 }
1296
1297 fn maybe_jwt_layer(jwt_secret: Option<JwtSecret>) -> Option<AuthLayer<JwtAuthValidator>> {
1299 jwt_secret.map(|secret| AuthLayer::new(JwtAuthValidator::new(secret)))
1300 }
1301
1302 fn maybe_compression_layer(disable_compression: bool) -> Option<CompressionLayer> {
1305 if disable_compression {
1306 None
1307 } else {
1308 Some(CompressionLayer::new())
1309 }
1310 }
1311
1312 pub async fn start(self, modules: &TransportRpcModules) -> Result<RpcServerHandle, RpcError>
1318 where
1319 RpcMiddleware: RethRpcMiddleware,
1320 {
1321 let mut http_handle = None;
1322 let mut ws_handle = None;
1323 let mut ipc_handle = None;
1324
1325 let http_socket_addr = self.http_addr.unwrap_or(SocketAddr::V4(SocketAddrV4::new(
1326 Ipv4Addr::LOCALHOST,
1327 constants::DEFAULT_HTTP_RPC_PORT,
1328 )));
1329
1330 let ws_socket_addr = self.ws_addr.unwrap_or(SocketAddr::V4(SocketAddrV4::new(
1331 Ipv4Addr::LOCALHOST,
1332 constants::DEFAULT_WS_RPC_PORT,
1333 )));
1334
1335 let metrics = modules.ipc.as_ref().map(RpcRequestMetrics::ipc).unwrap_or_default();
1336 let ipc_path =
1337 self.ipc_endpoint.clone().unwrap_or_else(|| constants::DEFAULT_IPC_ENDPOINT.into());
1338
1339 if let Some(builder) = self.ipc_server_config {
1340 let ipc = builder
1341 .set_rpc_middleware(IpcRpcServiceBuilder::new().layer(metrics))
1342 .build(ipc_path);
1343 ipc_handle = Some(ipc.start(modules.ipc.clone().expect("ipc server error")).await?);
1344 }
1345
1346 if self.http_addr == self.ws_addr &&
1348 self.http_server_config.is_some() &&
1349 self.ws_server_config.is_some()
1350 {
1351 let cors = match (self.ws_cors_domains.as_ref(), self.http_cors_domains.as_ref()) {
1352 (Some(ws_cors), Some(http_cors)) => {
1353 if ws_cors.trim() != http_cors.trim() {
1354 return Err(WsHttpSamePortError::ConflictingCorsDomains {
1355 http_cors_domains: Some(http_cors.clone()),
1356 ws_cors_domains: Some(ws_cors.clone()),
1357 }
1358 .into());
1359 }
1360 Some(ws_cors)
1361 }
1362 (a, b) => a.or(b),
1363 }
1364 .cloned();
1365
1366 modules.config.ensure_ws_http_identical()?;
1368
1369 if let Some(config) = self.http_server_config {
1370 let server = ServerBuilder::new()
1371 .set_http_middleware(
1372 tower::ServiceBuilder::new()
1373 .option_layer(Self::maybe_cors_layer(cors)?)
1374 .option_layer(Self::maybe_jwt_layer(self.jwt_secret))
1375 .option_layer(Self::maybe_compression_layer(
1376 self.http_disable_compression,
1377 )),
1378 )
1379 .set_rpc_middleware(
1380 RpcServiceBuilder::default()
1381 .layer(
1382 modules
1383 .http
1384 .as_ref()
1385 .or(modules.ws.as_ref())
1386 .map(RpcRequestMetrics::same_port)
1387 .unwrap_or_default(),
1388 )
1389 .layer(self.rpc_middleware.clone()),
1390 )
1391 .set_config(config.build())
1392 .build(http_socket_addr)
1393 .await
1394 .map_err(|err| {
1395 RpcError::server_error(err, ServerKind::WsHttp(http_socket_addr))
1396 })?;
1397 let addr = server.local_addr().map_err(|err| {
1398 RpcError::server_error(err, ServerKind::WsHttp(http_socket_addr))
1399 })?;
1400 if let Some(module) = modules.http.as_ref().or(modules.ws.as_ref()) {
1401 let handle = server.start(module.clone());
1402 http_handle = Some(handle.clone());
1403 ws_handle = Some(handle);
1404 }
1405 return Ok(RpcServerHandle {
1406 http_local_addr: Some(addr),
1407 ws_local_addr: Some(addr),
1408 http: http_handle,
1409 ws: ws_handle,
1410 ipc_endpoint: self.ipc_endpoint.clone(),
1411 ipc: ipc_handle,
1412 jwt_secret: self.jwt_secret,
1413 });
1414 }
1415 }
1416
1417 let mut ws_local_addr = None;
1418 let mut ws_server = None;
1419 let mut http_local_addr = None;
1420 let mut http_server = None;
1421
1422 if let Some(config) = self.ws_server_config {
1423 let server = ServerBuilder::new()
1424 .set_config(config.ws_only().build())
1425 .set_http_middleware(
1426 tower::ServiceBuilder::new()
1427 .option_layer(Self::maybe_cors_layer(self.ws_cors_domains.clone())?)
1428 .option_layer(Self::maybe_jwt_layer(self.jwt_secret)),
1429 )
1430 .set_rpc_middleware(
1431 RpcServiceBuilder::default()
1432 .layer(modules.ws.as_ref().map(RpcRequestMetrics::ws).unwrap_or_default())
1433 .layer(self.rpc_middleware.clone()),
1434 )
1435 .build(ws_socket_addr)
1436 .await
1437 .map_err(|err| RpcError::server_error(err, ServerKind::WS(ws_socket_addr)))?;
1438
1439 let addr = server
1440 .local_addr()
1441 .map_err(|err| RpcError::server_error(err, ServerKind::WS(ws_socket_addr)))?;
1442
1443 ws_local_addr = Some(addr);
1444 ws_server = Some(server);
1445 }
1446
1447 if let Some(config) = self.http_server_config {
1448 let server = ServerBuilder::new()
1449 .set_config(config.http_only().build())
1450 .set_http_middleware(
1451 tower::ServiceBuilder::new()
1452 .option_layer(Self::maybe_cors_layer(self.http_cors_domains.clone())?)
1453 .option_layer(Self::maybe_jwt_layer(self.jwt_secret))
1454 .option_layer(Self::maybe_compression_layer(self.http_disable_compression)),
1455 )
1456 .set_rpc_middleware(
1457 RpcServiceBuilder::default()
1458 .layer(
1459 modules.http.as_ref().map(RpcRequestMetrics::http).unwrap_or_default(),
1460 )
1461 .layer(self.rpc_middleware.clone()),
1462 )
1463 .build(http_socket_addr)
1464 .await
1465 .map_err(|err| RpcError::server_error(err, ServerKind::Http(http_socket_addr)))?;
1466 let local_addr = server
1467 .local_addr()
1468 .map_err(|err| RpcError::server_error(err, ServerKind::Http(http_socket_addr)))?;
1469 http_local_addr = Some(local_addr);
1470 http_server = Some(server);
1471 }
1472
1473 http_handle = http_server
1474 .map(|http_server| http_server.start(modules.http.clone().expect("http server error")));
1475 ws_handle = ws_server
1476 .map(|ws_server| ws_server.start(modules.ws.clone().expect("ws server error")));
1477 Ok(RpcServerHandle {
1478 http_local_addr,
1479 ws_local_addr,
1480 http: http_handle,
1481 ws: ws_handle,
1482 ipc_endpoint: self.ipc_endpoint.clone(),
1483 ipc: ipc_handle,
1484 jwt_secret: self.jwt_secret,
1485 })
1486 }
1487}
1488
1489#[derive(Debug, Clone, Default, Eq, PartialEq)]
1501pub struct TransportRpcModuleConfig {
1502 http: Option<RpcModuleSelection>,
1504 ws: Option<RpcModuleSelection>,
1506 ipc: Option<RpcModuleSelection>,
1508 config: Option<RpcModuleConfig>,
1510}
1511
1512impl TransportRpcModuleConfig {
1515 pub fn set_http(http: impl Into<RpcModuleSelection>) -> Self {
1517 Self::default().with_http(http)
1518 }
1519
1520 pub fn set_ws(ws: impl Into<RpcModuleSelection>) -> Self {
1522 Self::default().with_ws(ws)
1523 }
1524
1525 pub fn set_ipc(ipc: impl Into<RpcModuleSelection>) -> Self {
1527 Self::default().with_ipc(ipc)
1528 }
1529
1530 pub fn with_http(mut self, http: impl Into<RpcModuleSelection>) -> Self {
1532 self.http = Some(http.into());
1533 self
1534 }
1535
1536 pub fn with_ws(mut self, ws: impl Into<RpcModuleSelection>) -> Self {
1538 self.ws = Some(ws.into());
1539 self
1540 }
1541
1542 pub fn with_ipc(mut self, ipc: impl Into<RpcModuleSelection>) -> Self {
1544 self.ipc = Some(ipc.into());
1545 self
1546 }
1547
1548 pub fn with_config(mut self, config: RpcModuleConfig) -> Self {
1550 self.config = Some(config);
1551 self
1552 }
1553
1554 pub const fn http_mut(&mut self) -> &mut Option<RpcModuleSelection> {
1556 &mut self.http
1557 }
1558
1559 pub const fn ws_mut(&mut self) -> &mut Option<RpcModuleSelection> {
1561 &mut self.ws
1562 }
1563
1564 pub const fn ipc_mut(&mut self) -> &mut Option<RpcModuleSelection> {
1566 &mut self.ipc
1567 }
1568
1569 pub const fn config_mut(&mut self) -> &mut Option<RpcModuleConfig> {
1571 &mut self.config
1572 }
1573
1574 pub const fn is_empty(&self) -> bool {
1576 self.http.is_none() && self.ws.is_none() && self.ipc.is_none()
1577 }
1578
1579 pub const fn http(&self) -> Option<&RpcModuleSelection> {
1581 self.http.as_ref()
1582 }
1583
1584 pub const fn ws(&self) -> Option<&RpcModuleSelection> {
1586 self.ws.as_ref()
1587 }
1588
1589 pub const fn ipc(&self) -> Option<&RpcModuleSelection> {
1591 self.ipc.as_ref()
1592 }
1593
1594 pub const fn config(&self) -> Option<&RpcModuleConfig> {
1596 self.config.as_ref()
1597 }
1598
1599 pub fn contains_any(&self, module: &RethRpcModule) -> bool {
1601 self.contains_http(module) || self.contains_ws(module) || self.contains_ipc(module)
1602 }
1603
1604 pub fn contains_http(&self, module: &RethRpcModule) -> bool {
1606 self.http.as_ref().is_some_and(|http| http.contains(module))
1607 }
1608
1609 pub fn contains_ws(&self, module: &RethRpcModule) -> bool {
1611 self.ws.as_ref().is_some_and(|ws| ws.contains(module))
1612 }
1613
1614 pub fn contains_ipc(&self, module: &RethRpcModule) -> bool {
1616 self.ipc.as_ref().is_some_and(|ipc| ipc.contains(module))
1617 }
1618
1619 fn ensure_ws_http_identical(&self) -> Result<(), WsHttpSamePortError> {
1622 if RpcModuleSelection::are_identical(self.http.as_ref(), self.ws.as_ref()) {
1623 Ok(())
1624 } else {
1625 let http_modules =
1626 self.http.as_ref().map(RpcModuleSelection::to_selection).unwrap_or_default();
1627 let ws_modules =
1628 self.ws.as_ref().map(RpcModuleSelection::to_selection).unwrap_or_default();
1629
1630 let http_not_ws = http_modules.difference(&ws_modules).cloned().collect();
1631 let ws_not_http = ws_modules.difference(&http_modules).cloned().collect();
1632 let overlap = http_modules.intersection(&ws_modules).cloned().collect();
1633
1634 Err(WsHttpSamePortError::ConflictingModules(Box::new(ConflictingModules {
1635 overlap,
1636 http_not_ws,
1637 ws_not_http,
1638 })))
1639 }
1640 }
1641}
1642
1643#[derive(Debug, Clone, Default)]
1645pub struct TransportRpcModules<Context = ()> {
1646 config: TransportRpcModuleConfig,
1648 http: Option<RpcModule<Context>>,
1650 ws: Option<RpcModule<Context>>,
1652 ipc: Option<RpcModule<Context>>,
1654}
1655
1656impl TransportRpcModules {
1659 pub fn with_config(mut self, config: TransportRpcModuleConfig) -> Self {
1662 self.config = config;
1663 self
1664 }
1665
1666 pub fn with_http(mut self, http: RpcModule<()>) -> Self {
1669 self.http = Some(http);
1670 self
1671 }
1672
1673 pub fn with_ws(mut self, ws: RpcModule<()>) -> Self {
1676 self.ws = Some(ws);
1677 self
1678 }
1679
1680 pub fn with_ipc(mut self, ipc: RpcModule<()>) -> Self {
1683 self.ipc = Some(ipc);
1684 self
1685 }
1686
1687 pub const fn module_config(&self) -> &TransportRpcModuleConfig {
1689 &self.config
1690 }
1691
1692 pub fn merge_if_module_configured(
1697 &mut self,
1698 module: RethRpcModule,
1699 other: impl Into<Methods>,
1700 ) -> Result<(), RegisterMethodError> {
1701 let other = other.into();
1702 if self.module_config().contains_http(&module) {
1703 self.merge_http(other.clone())?;
1704 }
1705 if self.module_config().contains_ws(&module) {
1706 self.merge_ws(other.clone())?;
1707 }
1708 if self.module_config().contains_ipc(&module) {
1709 self.merge_ipc(other)?;
1710 }
1711
1712 Ok(())
1713 }
1714
1715 pub fn merge_if_module_configured_with<F>(
1722 &mut self,
1723 module: RethRpcModule,
1724 f: F,
1725 ) -> Result<(), RegisterMethodError>
1726 where
1727 F: FnOnce() -> Methods,
1728 {
1729 if !self.module_config().contains_any(&module) {
1731 return Ok(());
1732 }
1733 self.merge_if_module_configured(module, f())
1734 }
1735
1736 pub fn merge_http(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
1742 if let Some(ref mut http) = self.http {
1743 return http.merge(other.into()).map(|_| true)
1744 }
1745 Ok(false)
1746 }
1747
1748 pub fn merge_ws(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
1754 if let Some(ref mut ws) = self.ws {
1755 return ws.merge(other.into()).map(|_| true)
1756 }
1757 Ok(false)
1758 }
1759
1760 pub fn merge_ipc(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
1766 if let Some(ref mut ipc) = self.ipc {
1767 return ipc.merge(other.into()).map(|_| true)
1768 }
1769 Ok(false)
1770 }
1771
1772 pub fn merge_configured(
1776 &mut self,
1777 other: impl Into<Methods>,
1778 ) -> Result<(), RegisterMethodError> {
1779 let other = other.into();
1780 self.merge_http(other.clone())?;
1781 self.merge_ws(other.clone())?;
1782 self.merge_ipc(other)?;
1783 Ok(())
1784 }
1785
1786 pub fn methods_by_module(&self, module: RethRpcModule) -> Methods {
1790 self.methods_by(|name| name.starts_with(module.as_str()))
1791 }
1792
1793 pub fn methods_by<F>(&self, mut filter: F) -> Methods
1797 where
1798 F: FnMut(&str) -> bool,
1799 {
1800 let mut methods = Methods::new();
1801
1802 let mut f =
1804 |name: &str, mm: &Methods| filter(name) && !mm.method_names().any(|m| m == name);
1805
1806 if let Some(m) = self.http_methods(|name| f(name, &methods)) {
1807 let _ = methods.merge(m);
1808 }
1809 if let Some(m) = self.ws_methods(|name| f(name, &methods)) {
1810 let _ = methods.merge(m);
1811 }
1812 if let Some(m) = self.ipc_methods(|name| f(name, &methods)) {
1813 let _ = methods.merge(m);
1814 }
1815 methods
1816 }
1817
1818 pub fn http_methods<F>(&self, filter: F) -> Option<Methods>
1822 where
1823 F: FnMut(&str) -> bool,
1824 {
1825 self.http.as_ref().map(|module| methods_by(module, filter))
1826 }
1827
1828 pub fn ws_methods<F>(&self, filter: F) -> Option<Methods>
1832 where
1833 F: FnMut(&str) -> bool,
1834 {
1835 self.ws.as_ref().map(|module| methods_by(module, filter))
1836 }
1837
1838 pub fn ipc_methods<F>(&self, filter: F) -> Option<Methods>
1842 where
1843 F: FnMut(&str) -> bool,
1844 {
1845 self.ipc.as_ref().map(|module| methods_by(module, filter))
1846 }
1847
1848 pub fn remove_http_method(&mut self, method_name: &'static str) -> bool {
1856 if let Some(http_module) = &mut self.http {
1857 http_module.remove_method(method_name).is_some()
1858 } else {
1859 false
1860 }
1861 }
1862
1863 pub fn remove_http_methods(&mut self, methods: impl IntoIterator<Item = &'static str>) {
1865 for name in methods {
1866 self.remove_http_method(name);
1867 }
1868 }
1869
1870 pub fn remove_ws_method(&mut self, method_name: &'static str) -> bool {
1878 if let Some(ws_module) = &mut self.ws {
1879 ws_module.remove_method(method_name).is_some()
1880 } else {
1881 false
1882 }
1883 }
1884
1885 pub fn remove_ws_methods(&mut self, methods: impl IntoIterator<Item = &'static str>) {
1887 for name in methods {
1888 self.remove_ws_method(name);
1889 }
1890 }
1891
1892 pub fn remove_ipc_method(&mut self, method_name: &'static str) -> bool {
1900 if let Some(ipc_module) = &mut self.ipc {
1901 ipc_module.remove_method(method_name).is_some()
1902 } else {
1903 false
1904 }
1905 }
1906
1907 pub fn remove_ipc_methods(&mut self, methods: impl IntoIterator<Item = &'static str>) {
1909 for name in methods {
1910 self.remove_ipc_method(name);
1911 }
1912 }
1913
1914 pub fn remove_method_from_configured(&mut self, method_name: &'static str) -> bool {
1918 let http_removed = self.remove_http_method(method_name);
1919 let ws_removed = self.remove_ws_method(method_name);
1920 let ipc_removed = self.remove_ipc_method(method_name);
1921
1922 http_removed || ws_removed || ipc_removed
1923 }
1924
1925 pub fn rename(
1929 &mut self,
1930 old_name: &'static str,
1931 new_method: impl Into<Methods>,
1932 ) -> Result<(), RegisterMethodError> {
1933 self.remove_method_from_configured(old_name);
1935
1936 self.merge_configured(new_method)
1938 }
1939
1940 pub fn replace_http(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
1947 let other = other.into();
1948 self.remove_http_methods(other.method_names());
1949 self.merge_http(other)
1950 }
1951
1952 pub fn replace_ipc(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
1959 let other = other.into();
1960 self.remove_ipc_methods(other.method_names());
1961 self.merge_ipc(other)
1962 }
1963
1964 pub fn replace_ws(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
1971 let other = other.into();
1972 self.remove_ws_methods(other.method_names());
1973 self.merge_ws(other)
1974 }
1975
1976 pub fn replace_configured(
1980 &mut self,
1981 other: impl Into<Methods>,
1982 ) -> Result<bool, RegisterMethodError> {
1983 let other = other.into();
1984 self.replace_http(other.clone())?;
1985 self.replace_ws(other.clone())?;
1986 self.replace_ipc(other)?;
1987 Ok(true)
1988 }
1989
1990 pub fn add_or_replace_http(
1994 &mut self,
1995 other: impl Into<Methods>,
1996 ) -> Result<bool, RegisterMethodError> {
1997 let other = other.into();
1998 self.remove_http_methods(other.method_names());
1999 self.merge_http(other)
2000 }
2001
2002 pub fn add_or_replace_ws(
2006 &mut self,
2007 other: impl Into<Methods>,
2008 ) -> Result<bool, RegisterMethodError> {
2009 let other = other.into();
2010 self.remove_ws_methods(other.method_names());
2011 self.merge_ws(other)
2012 }
2013
2014 pub fn add_or_replace_ipc(
2018 &mut self,
2019 other: impl Into<Methods>,
2020 ) -> Result<bool, RegisterMethodError> {
2021 let other = other.into();
2022 self.remove_ipc_methods(other.method_names());
2023 self.merge_ipc(other)
2024 }
2025
2026 pub fn add_or_replace_configured(
2028 &mut self,
2029 other: impl Into<Methods>,
2030 ) -> Result<(), RegisterMethodError> {
2031 let other = other.into();
2032 self.add_or_replace_http(other.clone())?;
2033 self.add_or_replace_ws(other.clone())?;
2034 self.add_or_replace_ipc(other)?;
2035 Ok(())
2036 }
2037 pub fn add_or_replace_if_module_configured(
2040 &mut self,
2041 module: RethRpcModule,
2042 other: impl Into<Methods>,
2043 ) -> Result<(), RegisterMethodError> {
2044 let other = other.into();
2045 if self.module_config().contains_http(&module) {
2046 self.add_or_replace_http(other.clone())?;
2047 }
2048 if self.module_config().contains_ws(&module) {
2049 self.add_or_replace_ws(other.clone())?;
2050 }
2051 if self.module_config().contains_ipc(&module) {
2052 self.add_or_replace_ipc(other)?;
2053 }
2054 Ok(())
2055 }
2056}
2057
2058fn methods_by<T, F>(module: &RpcModule<T>, mut filter: F) -> Methods
2060where
2061 F: FnMut(&str) -> bool,
2062{
2063 let mut methods = Methods::new();
2064 let method_names = module.method_names().filter(|name| filter(name));
2065
2066 for name in method_names {
2067 if let Some(matched_method) = module.method(name).cloned() {
2068 let _ = methods.verify_and_insert(name, matched_method);
2069 }
2070 }
2071
2072 methods
2073}
2074
2075#[derive(Clone, Debug)]
2080#[must_use = "Server stops if dropped"]
2081pub struct RpcServerHandle {
2082 http_local_addr: Option<SocketAddr>,
2084 ws_local_addr: Option<SocketAddr>,
2085 http: Option<ServerHandle>,
2086 ws: Option<ServerHandle>,
2087 ipc_endpoint: Option<String>,
2088 ipc: Option<jsonrpsee::server::ServerHandle>,
2089 jwt_secret: Option<JwtSecret>,
2090}
2091
2092impl RpcServerHandle {
2095 fn bearer_token(&self) -> Option<String> {
2097 self.jwt_secret.as_ref().map(|secret| {
2098 format!(
2099 "Bearer {}",
2100 secret
2101 .encode(&Claims {
2102 iat: (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() +
2103 Duration::from_secs(60))
2104 .as_secs(),
2105 exp: None,
2106 })
2107 .unwrap()
2108 )
2109 })
2110 }
2111 pub const fn http_local_addr(&self) -> Option<SocketAddr> {
2113 self.http_local_addr
2114 }
2115
2116 pub const fn ws_local_addr(&self) -> Option<SocketAddr> {
2118 self.ws_local_addr
2119 }
2120
2121 pub fn stop(self) -> Result<(), AlreadyStoppedError> {
2123 if let Some(handle) = self.http {
2124 handle.stop()?
2125 }
2126
2127 if let Some(handle) = self.ws {
2128 handle.stop()?
2129 }
2130
2131 if let Some(handle) = self.ipc {
2132 handle.stop()?
2133 }
2134
2135 Ok(())
2136 }
2137
2138 pub fn ipc_endpoint(&self) -> Option<String> {
2140 self.ipc_endpoint.clone()
2141 }
2142
2143 pub fn http_url(&self) -> Option<String> {
2145 self.http_local_addr.map(|addr| format!("http://{addr}"))
2146 }
2147
2148 pub fn ws_url(&self) -> Option<String> {
2150 self.ws_local_addr.map(|addr| format!("ws://{addr}"))
2151 }
2152
2153 pub fn http_client(&self) -> Option<jsonrpsee::http_client::HttpClient> {
2155 let url = self.http_url()?;
2156
2157 let client = if let Some(token) = self.bearer_token() {
2158 jsonrpsee::http_client::HttpClientBuilder::default()
2159 .set_headers(HeaderMap::from_iter([(AUTHORIZATION, token.parse().unwrap())]))
2160 .build(url)
2161 } else {
2162 jsonrpsee::http_client::HttpClientBuilder::default().build(url)
2163 };
2164
2165 client.expect("failed to create http client").into()
2166 }
2167
2168 pub async fn ws_client(&self) -> Option<jsonrpsee::ws_client::WsClient> {
2170 let url = self.ws_url()?;
2171 let mut builder = jsonrpsee::ws_client::WsClientBuilder::default();
2172
2173 if let Some(token) = self.bearer_token() {
2174 let headers = HeaderMap::from_iter([(AUTHORIZATION, token.parse().unwrap())]);
2175 builder = builder.set_headers(headers);
2176 }
2177
2178 let client = builder.build(url).await.expect("failed to create ws client");
2179 Some(client)
2180 }
2181
2182 pub fn eth_http_provider(
2184 &self,
2185 ) -> Option<impl Provider<alloy_network::Ethereum> + Clone + Unpin + 'static> {
2186 self.new_http_provider_for()
2187 }
2188
2189 pub fn eth_http_provider_with_wallet<W>(
2192 &self,
2193 wallet: W,
2194 ) -> Option<impl Provider<alloy_network::Ethereum> + Clone + Unpin + 'static>
2195 where
2196 W: IntoWallet<alloy_network::Ethereum, NetworkWallet: Clone + Unpin + 'static>,
2197 {
2198 let rpc_url = self.http_url()?;
2199 let provider =
2200 ProviderBuilder::new().wallet(wallet).connect_http(rpc_url.parse().expect("valid url"));
2201 Some(provider)
2202 }
2203
2204 pub fn new_http_provider_for<N>(&self) -> Option<impl Provider<N> + Clone + Unpin + 'static>
2209 where
2210 N: RecommendedFillers<RecommendedFillers: Unpin>,
2211 {
2212 let rpc_url = self.http_url()?;
2213 let provider = ProviderBuilder::default()
2214 .with_recommended_fillers()
2215 .connect_http(rpc_url.parse().expect("valid url"));
2216 Some(provider)
2217 }
2218
2219 pub async fn eth_ws_provider(
2221 &self,
2222 ) -> Option<impl Provider<alloy_network::Ethereum> + Clone + Unpin + 'static> {
2223 self.new_ws_provider_for().await
2224 }
2225
2226 pub async fn eth_ws_provider_with_wallet<W>(
2229 &self,
2230 wallet: W,
2231 ) -> Option<impl Provider<alloy_network::Ethereum> + Clone + Unpin + 'static>
2232 where
2233 W: IntoWallet<alloy_network::Ethereum, NetworkWallet: Clone + Unpin + 'static>,
2234 {
2235 let rpc_url = self.ws_url()?;
2236 let provider = ProviderBuilder::new()
2237 .wallet(wallet)
2238 .connect(&rpc_url)
2239 .await
2240 .expect("failed to create ws client");
2241 Some(provider)
2242 }
2243
2244 pub async fn new_ws_provider_for<N>(&self) -> Option<impl Provider<N> + Clone + Unpin + 'static>
2249 where
2250 N: RecommendedFillers<RecommendedFillers: Unpin>,
2251 {
2252 let rpc_url = self.ws_url()?;
2253 let provider = ProviderBuilder::default()
2254 .with_recommended_fillers()
2255 .connect(&rpc_url)
2256 .await
2257 .expect("failed to create ws client");
2258 Some(provider)
2259 }
2260
2261 pub async fn eth_ipc_provider(
2263 &self,
2264 ) -> Option<impl Provider<alloy_network::Ethereum> + Clone + Unpin + 'static> {
2265 self.new_ipc_provider_for().await
2266 }
2267
2268 pub async fn new_ipc_provider_for<N>(
2273 &self,
2274 ) -> Option<impl Provider<N> + Clone + Unpin + 'static>
2275 where
2276 N: RecommendedFillers<RecommendedFillers: Unpin>,
2277 {
2278 let rpc_url = self.ipc_endpoint()?;
2279 let provider = ProviderBuilder::default()
2280 .with_recommended_fillers()
2281 .connect(&rpc_url)
2282 .await
2283 .expect("failed to create ipc client");
2284 Some(provider)
2285 }
2286}
2287
2288#[cfg(test)]
2289mod tests {
2290 use super::*;
2291
2292 #[test]
2293 fn parse_eth_call_bundle_selection() {
2294 let selection = "eth,admin,debug".parse::<RpcModuleSelection>().unwrap();
2295 assert_eq!(
2296 selection,
2297 RpcModuleSelection::Selection(
2298 [RethRpcModule::Eth, RethRpcModule::Admin, RethRpcModule::Debug,].into()
2299 )
2300 );
2301 }
2302
2303 #[test]
2304 fn parse_rpc_module_selection() {
2305 let selection = "all".parse::<RpcModuleSelection>().unwrap();
2306 assert_eq!(selection, RpcModuleSelection::All);
2307 }
2308
2309 #[test]
2310 fn parse_rpc_module_selection_none() {
2311 let selection = "none".parse::<RpcModuleSelection>().unwrap();
2312 assert_eq!(selection, RpcModuleSelection::Selection(Default::default()));
2313 }
2314
2315 #[test]
2316 fn parse_rpc_unique_module_selection() {
2317 let selection = "eth,admin,eth,net".parse::<RpcModuleSelection>().unwrap();
2318 assert_eq!(
2319 selection,
2320 RpcModuleSelection::Selection(
2321 [RethRpcModule::Eth, RethRpcModule::Admin, RethRpcModule::Net,].into()
2322 )
2323 );
2324 }
2325
2326 #[test]
2327 fn identical_selection() {
2328 assert!(RpcModuleSelection::are_identical(
2329 Some(&RpcModuleSelection::All),
2330 Some(&RpcModuleSelection::All),
2331 ));
2332 assert!(!RpcModuleSelection::are_identical(
2333 Some(&RpcModuleSelection::All),
2334 Some(&RpcModuleSelection::Standard),
2335 ));
2336 assert!(RpcModuleSelection::are_identical(
2337 Some(&RpcModuleSelection::Selection(RpcModuleSelection::Standard.to_selection())),
2338 Some(&RpcModuleSelection::Standard),
2339 ));
2340 assert!(RpcModuleSelection::are_identical(
2341 Some(&RpcModuleSelection::Selection([RethRpcModule::Eth].into())),
2342 Some(&RpcModuleSelection::Selection([RethRpcModule::Eth].into())),
2343 ));
2344 assert!(RpcModuleSelection::are_identical(
2345 None,
2346 Some(&RpcModuleSelection::Selection(Default::default())),
2347 ));
2348 assert!(RpcModuleSelection::are_identical(
2349 Some(&RpcModuleSelection::Selection(Default::default())),
2350 None,
2351 ));
2352 assert!(RpcModuleSelection::are_identical(None, None));
2353 }
2354
2355 #[test]
2356 fn test_rpc_module_str() {
2357 macro_rules! assert_rpc_module {
2358 ($($s:expr => $v:expr,)*) => {
2359 $(
2360 let val: RethRpcModule = $s.parse().unwrap();
2361 assert_eq!(val, $v);
2362 assert_eq!(val.to_string(), $s);
2363 )*
2364 };
2365 }
2366 assert_rpc_module!
2367 (
2368 "admin" => RethRpcModule::Admin,
2369 "debug" => RethRpcModule::Debug,
2370 "eth" => RethRpcModule::Eth,
2371 "net" => RethRpcModule::Net,
2372 "trace" => RethRpcModule::Trace,
2373 "web3" => RethRpcModule::Web3,
2374 "rpc" => RethRpcModule::Rpc,
2375 "ots" => RethRpcModule::Ots,
2376 "reth" => RethRpcModule::Reth,
2377 );
2378 }
2379
2380 #[test]
2381 fn test_default_selection() {
2382 let selection = RpcModuleSelection::Standard.to_selection();
2383 assert_eq!(selection, [RethRpcModule::Eth, RethRpcModule::Net, RethRpcModule::Web3].into())
2384 }
2385
2386 #[test]
2387 fn test_create_rpc_module_config() {
2388 let selection = vec!["eth", "admin"];
2389 let config = RpcModuleSelection::try_from_selection(selection).unwrap();
2390 assert_eq!(
2391 config,
2392 RpcModuleSelection::Selection([RethRpcModule::Eth, RethRpcModule::Admin].into())
2393 );
2394 }
2395
2396 #[test]
2397 fn test_configure_transport_config() {
2398 let config = TransportRpcModuleConfig::default()
2399 .with_http([RethRpcModule::Eth, RethRpcModule::Admin]);
2400 assert_eq!(
2401 config,
2402 TransportRpcModuleConfig {
2403 http: Some(RpcModuleSelection::Selection(
2404 [RethRpcModule::Eth, RethRpcModule::Admin].into()
2405 )),
2406 ws: None,
2407 ipc: None,
2408 config: None,
2409 }
2410 )
2411 }
2412
2413 #[test]
2414 fn test_configure_transport_config_none() {
2415 let config = TransportRpcModuleConfig::default().with_http(Vec::<RethRpcModule>::new());
2416 assert_eq!(
2417 config,
2418 TransportRpcModuleConfig {
2419 http: Some(RpcModuleSelection::Selection(Default::default())),
2420 ws: None,
2421 ipc: None,
2422 config: None,
2423 }
2424 )
2425 }
2426
2427 fn create_test_module() -> RpcModule<()> {
2428 let mut module = RpcModule::new(());
2429 module.register_method("anything", |_, _, _| "succeed").unwrap();
2430 module
2431 }
2432
2433 #[test]
2434 fn test_remove_http_method() {
2435 let mut modules =
2436 TransportRpcModules { http: Some(create_test_module()), ..Default::default() };
2437 assert!(modules.remove_http_method("anything"));
2439
2440 assert!(!modules.remove_http_method("non_existent_method"));
2442
2443 assert!(modules.http.as_ref().unwrap().method("anything").is_none());
2445 }
2446
2447 #[test]
2448 fn test_remove_ws_method() {
2449 let mut modules =
2450 TransportRpcModules { ws: Some(create_test_module()), ..Default::default() };
2451
2452 assert!(modules.remove_ws_method("anything"));
2454
2455 assert!(!modules.remove_ws_method("non_existent_method"));
2457
2458 assert!(modules.ws.as_ref().unwrap().method("anything").is_none());
2460 }
2461
2462 #[test]
2463 fn test_remove_ipc_method() {
2464 let mut modules =
2465 TransportRpcModules { ipc: Some(create_test_module()), ..Default::default() };
2466
2467 assert!(modules.remove_ipc_method("anything"));
2469
2470 assert!(!modules.remove_ipc_method("non_existent_method"));
2472
2473 assert!(modules.ipc.as_ref().unwrap().method("anything").is_none());
2475 }
2476
2477 #[test]
2478 fn test_remove_method_from_configured() {
2479 let mut modules = TransportRpcModules {
2480 http: Some(create_test_module()),
2481 ws: Some(create_test_module()),
2482 ipc: Some(create_test_module()),
2483 ..Default::default()
2484 };
2485
2486 assert!(modules.remove_method_from_configured("anything"));
2488
2489 assert!(!modules.remove_method_from_configured("anything"));
2491
2492 assert!(!modules.remove_method_from_configured("non_existent_method"));
2494
2495 assert!(modules.http.as_ref().unwrap().method("anything").is_none());
2497 assert!(modules.ws.as_ref().unwrap().method("anything").is_none());
2498 assert!(modules.ipc.as_ref().unwrap().method("anything").is_none());
2499 }
2500
2501 #[test]
2502 fn test_transport_rpc_module_rename() {
2503 let mut modules = TransportRpcModules {
2504 http: Some(create_test_module()),
2505 ws: Some(create_test_module()),
2506 ipc: Some(create_test_module()),
2507 ..Default::default()
2508 };
2509
2510 assert!(modules.http.as_ref().unwrap().method("anything").is_some());
2512 assert!(modules.ws.as_ref().unwrap().method("anything").is_some());
2513 assert!(modules.ipc.as_ref().unwrap().method("anything").is_some());
2514
2515 assert!(modules.http.as_ref().unwrap().method("something").is_none());
2517 assert!(modules.ws.as_ref().unwrap().method("something").is_none());
2518 assert!(modules.ipc.as_ref().unwrap().method("something").is_none());
2519
2520 let mut other_module = RpcModule::new(());
2522 other_module.register_method("something", |_, _, _| "fails").unwrap();
2523
2524 modules.rename("anything", other_module).expect("rename failed");
2526
2527 assert!(modules.http.as_ref().unwrap().method("anything").is_none());
2529 assert!(modules.ws.as_ref().unwrap().method("anything").is_none());
2530 assert!(modules.ipc.as_ref().unwrap().method("anything").is_none());
2531
2532 assert!(modules.http.as_ref().unwrap().method("something").is_some());
2534 assert!(modules.ws.as_ref().unwrap().method("something").is_some());
2535 assert!(modules.ipc.as_ref().unwrap().method("something").is_some());
2536 }
2537
2538 #[test]
2539 fn test_replace_http_method() {
2540 let mut modules =
2541 TransportRpcModules { http: Some(create_test_module()), ..Default::default() };
2542
2543 let mut other_module = RpcModule::new(());
2544 other_module.register_method("something", |_, _, _| "fails").unwrap();
2545
2546 assert!(modules.replace_http(other_module.clone()).unwrap());
2547
2548 assert!(modules.http.as_ref().unwrap().method("something").is_some());
2549
2550 other_module.register_method("anything", |_, _, _| "fails").unwrap();
2551 assert!(modules.replace_http(other_module.clone()).unwrap());
2552
2553 assert!(modules.http.as_ref().unwrap().method("anything").is_some());
2554 }
2555 #[test]
2556 fn test_replace_ipc_method() {
2557 let mut modules =
2558 TransportRpcModules { ipc: Some(create_test_module()), ..Default::default() };
2559
2560 let mut other_module = RpcModule::new(());
2561 other_module.register_method("something", |_, _, _| "fails").unwrap();
2562
2563 assert!(modules.replace_ipc(other_module.clone()).unwrap());
2564
2565 assert!(modules.ipc.as_ref().unwrap().method("something").is_some());
2566
2567 other_module.register_method("anything", |_, _, _| "fails").unwrap();
2568 assert!(modules.replace_ipc(other_module.clone()).unwrap());
2569
2570 assert!(modules.ipc.as_ref().unwrap().method("anything").is_some());
2571 }
2572 #[test]
2573 fn test_replace_ws_method() {
2574 let mut modules =
2575 TransportRpcModules { ws: Some(create_test_module()), ..Default::default() };
2576
2577 let mut other_module = RpcModule::new(());
2578 other_module.register_method("something", |_, _, _| "fails").unwrap();
2579
2580 assert!(modules.replace_ws(other_module.clone()).unwrap());
2581
2582 assert!(modules.ws.as_ref().unwrap().method("something").is_some());
2583
2584 other_module.register_method("anything", |_, _, _| "fails").unwrap();
2585 assert!(modules.replace_ws(other_module.clone()).unwrap());
2586
2587 assert!(modules.ws.as_ref().unwrap().method("anything").is_some());
2588 }
2589
2590 #[test]
2591 fn test_replace_configured() {
2592 let mut modules = TransportRpcModules {
2593 http: Some(create_test_module()),
2594 ws: Some(create_test_module()),
2595 ipc: Some(create_test_module()),
2596 ..Default::default()
2597 };
2598 let mut other_module = RpcModule::new(());
2599 other_module.register_method("something", |_, _, _| "fails").unwrap();
2600
2601 assert!(modules.replace_configured(other_module).unwrap());
2602
2603 assert!(modules.http.as_ref().unwrap().method("something").is_some());
2605 assert!(modules.ipc.as_ref().unwrap().method("something").is_some());
2606 assert!(modules.ws.as_ref().unwrap().method("something").is_some());
2607
2608 assert!(modules.http.as_ref().unwrap().method("anything").is_some());
2609 assert!(modules.ipc.as_ref().unwrap().method("anything").is_some());
2610 assert!(modules.ws.as_ref().unwrap().method("anything").is_some());
2611 }
2612
2613 #[test]
2614 fn test_add_or_replace_if_module_configured() {
2615 let config = TransportRpcModuleConfig::default()
2617 .with_http([RethRpcModule::Eth])
2618 .with_ws([RethRpcModule::Eth]);
2619
2620 let mut http_module = RpcModule::new(());
2622 http_module.register_method("eth_existing", |_, _, _| "original").unwrap();
2623
2624 let mut ws_module = RpcModule::new(());
2626 ws_module.register_method("eth_existing", |_, _, _| "original").unwrap();
2627
2628 let ipc_module = RpcModule::new(());
2630
2631 let mut modules = TransportRpcModules {
2633 config,
2634 http: Some(http_module),
2635 ws: Some(ws_module),
2636 ipc: Some(ipc_module),
2637 };
2638
2639 let mut new_module = RpcModule::new(());
2641 new_module.register_method("eth_existing", |_, _, _| "replaced").unwrap(); new_module.register_method("eth_new", |_, _, _| "added").unwrap(); let new_methods: Methods = new_module.into();
2644
2645 let result = modules.add_or_replace_if_module_configured(RethRpcModule::Eth, new_methods);
2647 assert!(result.is_ok(), "Function should succeed");
2648
2649 let http = modules.http.as_ref().unwrap();
2651 assert!(http.method("eth_existing").is_some());
2652 assert!(http.method("eth_new").is_some());
2653
2654 let ws = modules.ws.as_ref().unwrap();
2656 assert!(ws.method("eth_existing").is_some());
2657 assert!(ws.method("eth_new").is_some());
2658
2659 let ipc = modules.ipc.as_ref().unwrap();
2661 assert!(ipc.method("eth_existing").is_none());
2662 assert!(ipc.method("eth_new").is_none());
2663 }
2664
2665 #[test]
2666 fn test_merge_if_module_configured_with_lazy_evaluation() {
2667 let config = TransportRpcModuleConfig::default().with_http([RethRpcModule::Eth]);
2669
2670 let mut modules =
2671 TransportRpcModules { config, http: Some(RpcModule::new(())), ws: None, ipc: None };
2672
2673 let mut closure_called = false;
2675
2676 let result = modules.merge_if_module_configured_with(RethRpcModule::Eth, || {
2678 closure_called = true;
2679 let mut methods = RpcModule::new(());
2680 methods.register_method("eth_test", |_, _, _| "test").unwrap();
2681 methods.into()
2682 });
2683
2684 assert!(result.is_ok());
2685 assert!(closure_called, "Closure should be called when module is configured");
2686 assert!(modules.http.as_ref().unwrap().method("eth_test").is_some());
2687
2688 closure_called = false;
2690 let result = modules.merge_if_module_configured_with(RethRpcModule::Debug, || {
2691 closure_called = true;
2692 RpcModule::new(()).into()
2693 });
2694
2695 assert!(result.is_ok());
2696 assert!(!closure_called, "Closure should NOT be called when module is not configured");
2697 }
2698}