1#![doc(
15 html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
16 html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
17 issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
18)]
19#![cfg_attr(not(test), warn(unused_crate_dependencies))]
20#![cfg_attr(docsrs, feature(doc_cfg))]
21
22use crate::{auth::AuthRpcModule, error::WsHttpSamePortError, metrics::RpcRequestMetrics};
23use alloy_network::{Ethereum, IntoWallet};
24use alloy_provider::{fillers::RecommendedFillers, Provider, ProviderBuilder};
25use core::marker::PhantomData;
26use error::{ConflictingModules, RpcError, ServerKind};
27use http::{header::AUTHORIZATION, HeaderMap};
28use jsonrpsee::{
29 core::RegisterMethodError,
30 server::{middleware::rpc::RpcServiceBuilder, AlreadyStoppedError, IdProvider, ServerHandle},
31 Methods, RpcModule,
32};
33use reth_chainspec::{ChainSpecProvider, EthereumHardforks};
34use reth_consensus::FullConsensus;
35use reth_engine_primitives::ConsensusEngineEvent;
36use reth_evm::ConfigureEvm;
37use reth_network_api::{noop::NoopNetwork, NetworkInfo, Peers};
38use reth_primitives_traits::{NodePrimitives, TxTy};
39use reth_rpc::{
40 AdminApi, DebugApi, EngineEthApi, EthApi, EthApiBuilder, EthBundle, MinerApi, NetApi,
41 OtterscanApi, RPCApi, RethApi, TraceApi, TxPoolApi, Web3Api,
42};
43use reth_rpc_api::servers::*;
44use reth_rpc_eth_api::{
45 helpers::{
46 pending_block::PendingEnvBuilder, Call, EthApiSpec, EthTransactions, LoadPendingBlock,
47 TraceExt,
48 },
49 node::RpcNodeCoreAdapter,
50 EthApiServer, EthApiTypes, FullEthApiServer, FullEthApiTypes, RpcBlock, RpcConvert,
51 RpcConverter, RpcHeader, RpcNodeCore, RpcReceipt, RpcTransaction, RpcTxReq,
52};
53use reth_rpc_eth_types::{receipt::EthReceiptConverter, EthConfig, EthSubscriptionIdProvider};
54use reth_rpc_layer::{AuthLayer, Claims, CompressionLayer, JwtAuthValidator, JwtSecret};
55pub use reth_rpc_server_types::RethRpcModule;
56use reth_storage_api::{
57 AccountReader, BlockReader, ChangeSetReader, FullRpcProvider, NodePrimitivesProvider,
58 StateProviderFactory,
59};
60use reth_tasks::{pool::BlockingTaskGuard, TaskSpawner, TokioTaskExecutor};
61use reth_tokio_util::EventSender;
62use reth_transaction_pool::{noop::NoopTransactionPool, TransactionPool};
63use serde::{Deserialize, Serialize};
64use std::{
65 collections::HashMap,
66 fmt::Debug,
67 net::{Ipv4Addr, SocketAddr, SocketAddrV4},
68 time::{Duration, SystemTime, UNIX_EPOCH},
69};
70use tower_http::cors::CorsLayer;
71
72pub use cors::CorsDomainError;
73
74pub use jsonrpsee::server::ServerBuilder;
76use jsonrpsee::server::ServerConfigBuilder;
77pub use reth_ipc::server::{
78 Builder as IpcServerBuilder, RpcServiceBuilder as IpcRpcServiceBuilder,
79};
80pub use reth_rpc_server_types::{constants, RpcModuleSelection};
81pub use tower::layer::util::{Identity, Stack};
82
83pub mod auth;
85
86pub mod config;
88
89pub mod middleware;
91
92mod cors;
94
95pub mod error;
97
98pub mod eth;
100pub use eth::EthHandlers;
101
102mod metrics;
104use crate::middleware::RethRpcMiddleware;
105pub use metrics::{MeteredBatchRequestsFuture, MeteredRequestFuture, RpcRequestMetricsService};
106use reth_chain_state::{
107 CanonStateSubscriptions, ForkChoiceSubscriptions, PersistedBlockSubscriptions,
108};
109use reth_rpc::eth::sim_bundle::EthSimBundle;
110
111pub mod rate_limiter;
113
114#[derive(Debug, Clone)]
118pub struct RpcModuleBuilder<N, Provider, Pool, Network, EvmConfig, Consensus> {
119 provider: Provider,
121 pool: Pool,
123 network: Network,
125 executor: Box<dyn TaskSpawner + 'static>,
127 evm_config: EvmConfig,
129 consensus: Consensus,
131 _primitives: PhantomData<N>,
133}
134
135impl<N, Provider, Pool, Network, EvmConfig, Consensus>
138 RpcModuleBuilder<N, Provider, Pool, Network, EvmConfig, Consensus>
139{
140 pub const fn new(
142 provider: Provider,
143 pool: Pool,
144 network: Network,
145 executor: Box<dyn TaskSpawner + 'static>,
146 evm_config: EvmConfig,
147 consensus: Consensus,
148 ) -> Self {
149 Self { provider, pool, network, executor, evm_config, consensus, _primitives: PhantomData }
150 }
151
152 pub fn with_provider<P>(
154 self,
155 provider: P,
156 ) -> RpcModuleBuilder<N, P, Pool, Network, EvmConfig, Consensus> {
157 let Self { pool, network, executor, evm_config, consensus, _primitives, .. } = self;
158 RpcModuleBuilder { provider, network, pool, executor, evm_config, consensus, _primitives }
159 }
160
161 pub fn with_pool<P>(
163 self,
164 pool: P,
165 ) -> RpcModuleBuilder<N, Provider, P, Network, EvmConfig, Consensus> {
166 let Self { provider, network, executor, evm_config, consensus, _primitives, .. } = self;
167 RpcModuleBuilder { provider, network, pool, executor, evm_config, consensus, _primitives }
168 }
169
170 pub fn with_noop_pool(
176 self,
177 ) -> RpcModuleBuilder<N, Provider, NoopTransactionPool, Network, EvmConfig, Consensus> {
178 let Self { provider, executor, network, evm_config, consensus, _primitives, .. } = self;
179 RpcModuleBuilder {
180 provider,
181 executor,
182 network,
183 evm_config,
184 pool: NoopTransactionPool::default(),
185 consensus,
186 _primitives,
187 }
188 }
189
190 pub fn with_network<Net>(
192 self,
193 network: Net,
194 ) -> RpcModuleBuilder<N, Provider, Pool, Net, EvmConfig, Consensus> {
195 let Self { provider, pool, executor, evm_config, consensus, _primitives, .. } = self;
196 RpcModuleBuilder { provider, network, pool, executor, evm_config, consensus, _primitives }
197 }
198
199 pub fn with_noop_network(
205 self,
206 ) -> RpcModuleBuilder<N, Provider, Pool, NoopNetwork, EvmConfig, Consensus> {
207 let Self { provider, pool, executor, evm_config, consensus, _primitives, .. } = self;
208 RpcModuleBuilder {
209 provider,
210 pool,
211 executor,
212 network: NoopNetwork::default(),
213 evm_config,
214 consensus,
215 _primitives,
216 }
217 }
218
219 pub fn with_executor(self, executor: Box<dyn TaskSpawner + 'static>) -> Self {
221 let Self { pool, network, provider, evm_config, consensus, _primitives, .. } = self;
222 Self { provider, network, pool, executor, evm_config, consensus, _primitives }
223 }
224
225 pub fn with_tokio_executor(self) -> Self {
230 let Self { pool, network, provider, evm_config, consensus, _primitives, .. } = self;
231 Self {
232 provider,
233 network,
234 pool,
235 executor: Box::new(TokioTaskExecutor::default()),
236 evm_config,
237 consensus,
238 _primitives,
239 }
240 }
241
242 pub fn with_evm_config<E>(
244 self,
245 evm_config: E,
246 ) -> RpcModuleBuilder<N, Provider, Pool, Network, E, Consensus> {
247 let Self { provider, pool, executor, network, consensus, _primitives, .. } = self;
248 RpcModuleBuilder { provider, network, pool, executor, evm_config, consensus, _primitives }
249 }
250
251 pub fn with_consensus<C>(
253 self,
254 consensus: C,
255 ) -> RpcModuleBuilder<N, Provider, Pool, Network, EvmConfig, C> {
256 let Self { provider, network, pool, executor, evm_config, _primitives, .. } = self;
257 RpcModuleBuilder { provider, network, pool, executor, evm_config, consensus, _primitives }
258 }
259
260 #[expect(clippy::type_complexity)]
262 pub fn eth_api_builder<ChainSpec>(
263 &self,
264 ) -> EthApiBuilder<
265 RpcNodeCoreAdapter<Provider, Pool, Network, EvmConfig>,
266 RpcConverter<Ethereum, EvmConfig, EthReceiptConverter<ChainSpec>>,
267 >
268 where
269 Provider: Clone,
270 Pool: Clone,
271 Network: Clone,
272 EvmConfig: Clone,
273 RpcNodeCoreAdapter<Provider, Pool, Network, EvmConfig>:
274 RpcNodeCore<Provider: ChainSpecProvider<ChainSpec = ChainSpec>, Evm = EvmConfig>,
275 {
276 EthApiBuilder::new(
277 self.provider.clone(),
278 self.pool.clone(),
279 self.network.clone(),
280 self.evm_config.clone(),
281 )
282 }
283
284 #[expect(clippy::type_complexity)]
290 pub fn bootstrap_eth_api<ChainSpec>(
291 &self,
292 ) -> EthApi<
293 RpcNodeCoreAdapter<Provider, Pool, Network, EvmConfig>,
294 RpcConverter<Ethereum, EvmConfig, EthReceiptConverter<ChainSpec>>,
295 >
296 where
297 Provider: Clone,
298 Pool: Clone,
299 Network: Clone,
300 EvmConfig: ConfigureEvm + Clone,
301 RpcNodeCoreAdapter<Provider, Pool, Network, EvmConfig>:
302 RpcNodeCore<Provider: ChainSpecProvider<ChainSpec = ChainSpec>, Evm = EvmConfig>,
303 RpcConverter<Ethereum, EvmConfig, EthReceiptConverter<ChainSpec>>: RpcConvert,
304 (): PendingEnvBuilder<EvmConfig>,
305 {
306 self.eth_api_builder().build()
307 }
308}
309
310impl<N, Provider, Pool, Network, EvmConfig, Consensus>
311 RpcModuleBuilder<N, Provider, Pool, Network, EvmConfig, Consensus>
312where
313 N: NodePrimitives,
314 Provider: FullRpcProvider<Block = N::Block, Receipt = N::Receipt, Header = N::BlockHeader>
315 + CanonStateSubscriptions<Primitives = N>
316 + ForkChoiceSubscriptions<Header = N::BlockHeader>
317 + PersistedBlockSubscriptions
318 + AccountReader
319 + ChangeSetReader,
320 Pool: TransactionPool + Clone + 'static,
321 Network: NetworkInfo + Peers + Clone + 'static,
322 EvmConfig: ConfigureEvm<Primitives = N> + 'static,
323 Consensus: FullConsensus<N> + Clone + 'static,
324{
325 pub fn build_with_auth_server<EthApi>(
332 self,
333 module_config: TransportRpcModuleConfig,
334 engine: impl IntoEngineApiRpcModule,
335 eth: EthApi,
336 engine_events: EventSender<ConsensusEngineEvent<N>>,
337 ) -> (
338 TransportRpcModules,
339 AuthRpcModule,
340 RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>,
341 )
342 where
343 EthApi: FullEthApiServer<Provider = Provider, Pool = Pool>,
344 {
345 let config = module_config.config.clone().unwrap_or_default();
346
347 let mut registry = self.into_registry(config, eth, engine_events);
348 let modules = registry.create_transport_rpc_modules(module_config);
349 let auth_module = registry.create_auth_module(engine);
350
351 (modules, auth_module, registry)
352 }
353
354 pub fn into_registry<EthApi>(
359 self,
360 config: RpcModuleConfig,
361 eth: EthApi,
362 engine_events: EventSender<ConsensusEngineEvent<N>>,
363 ) -> RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
364 where
365 EthApi: FullEthApiServer<Provider = Provider, Pool = Pool>,
366 {
367 let Self { provider, pool, network, executor, consensus, evm_config, .. } = self;
368 RpcRegistryInner::new(
369 provider,
370 pool,
371 network,
372 executor,
373 consensus,
374 config,
375 evm_config,
376 eth,
377 engine_events,
378 )
379 }
380
381 pub fn build<EthApi>(
384 self,
385 module_config: TransportRpcModuleConfig,
386 eth: EthApi,
387 engine_events: EventSender<ConsensusEngineEvent<N>>,
388 ) -> TransportRpcModules<()>
389 where
390 EthApi: FullEthApiServer<Provider = Provider, Pool = Pool>,
391 {
392 let mut modules = TransportRpcModules::default();
393
394 if !module_config.is_empty() {
395 let TransportRpcModuleConfig { http, ws, ipc, config } = module_config.clone();
396
397 let mut registry = self.into_registry(config.unwrap_or_default(), eth, engine_events);
398
399 modules.config = module_config;
400 modules.http = registry.maybe_module(http.as_ref());
401 modules.ws = registry.maybe_module(ws.as_ref());
402 modules.ipc = registry.maybe_module(ipc.as_ref());
403 }
404
405 modules
406 }
407}
408
409impl<N: NodePrimitives> Default for RpcModuleBuilder<N, (), (), (), (), ()> {
410 fn default() -> Self {
411 Self::new((), (), (), Box::new(TokioTaskExecutor::default()), (), ())
412 }
413}
414
415#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize)]
417pub struct RpcModuleConfig {
418 eth: EthConfig,
420}
421
422impl RpcModuleConfig {
425 pub fn builder() -> RpcModuleConfigBuilder {
427 RpcModuleConfigBuilder::default()
428 }
429
430 pub const fn new(eth: EthConfig) -> Self {
432 Self { eth }
433 }
434
435 pub const fn eth(&self) -> &EthConfig {
437 &self.eth
438 }
439
440 pub const fn eth_mut(&mut self) -> &mut EthConfig {
442 &mut self.eth
443 }
444}
445
446#[derive(Clone, Debug, Default)]
448pub struct RpcModuleConfigBuilder {
449 eth: Option<EthConfig>,
450}
451
452impl RpcModuleConfigBuilder {
455 pub fn eth(mut self, eth: EthConfig) -> Self {
457 self.eth = Some(eth);
458 self
459 }
460
461 pub fn build(self) -> RpcModuleConfig {
463 let Self { eth } = self;
464 RpcModuleConfig { eth: eth.unwrap_or_default() }
465 }
466
467 pub const fn get_eth(&self) -> Option<&EthConfig> {
469 self.eth.as_ref()
470 }
471
472 pub const fn eth_mut(&mut self) -> &mut Option<EthConfig> {
474 &mut self.eth
475 }
476
477 pub fn eth_mut_or_default(&mut self) -> &mut EthConfig {
479 self.eth.get_or_insert_with(EthConfig::default)
480 }
481}
482
483#[derive(Debug)]
485pub struct RpcRegistryInner<Provider, Pool, Network, EthApi: EthApiTypes, EvmConfig, Consensus> {
486 provider: Provider,
487 pool: Pool,
488 network: Network,
489 executor: Box<dyn TaskSpawner + 'static>,
490 evm_config: EvmConfig,
491 consensus: Consensus,
492 eth: EthHandlers<EthApi>,
494 blocking_pool_guard: BlockingTaskGuard,
496 modules: HashMap<RethRpcModule, Methods>,
498 eth_config: EthConfig,
500 engine_events:
502 EventSender<ConsensusEngineEvent<<EthApi::RpcConvert as RpcConvert>::Primitives>>,
503}
504
505impl<N, Provider, Pool, Network, EthApi, EvmConfig, Consensus>
508 RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
509where
510 N: NodePrimitives,
511 Provider: StateProviderFactory
512 + CanonStateSubscriptions<Primitives = N>
513 + BlockReader<Block = N::Block, Receipt = N::Receipt>
514 + Clone
515 + Unpin
516 + 'static,
517 Pool: Send + Sync + Clone + 'static,
518 Network: Clone + 'static,
519 EthApi: FullEthApiTypes + 'static,
520 EvmConfig: ConfigureEvm<Primitives = N>,
521{
522 #[expect(clippy::too_many_arguments)]
524 pub fn new(
525 provider: Provider,
526 pool: Pool,
527 network: Network,
528 executor: Box<dyn TaskSpawner + 'static>,
529 consensus: Consensus,
530 config: RpcModuleConfig,
531 evm_config: EvmConfig,
532 eth_api: EthApi,
533 engine_events: EventSender<
534 ConsensusEngineEvent<<EthApi::Provider as NodePrimitivesProvider>::Primitives>,
535 >,
536 ) -> Self
537 where
538 EvmConfig: ConfigureEvm<Primitives = N>,
539 {
540 let blocking_pool_guard = BlockingTaskGuard::new(config.eth.max_tracing_requests);
541
542 let eth = EthHandlers::bootstrap(config.eth.clone(), executor.clone(), eth_api);
543
544 Self {
545 provider,
546 pool,
547 network,
548 eth,
549 executor,
550 consensus,
551 modules: Default::default(),
552 blocking_pool_guard,
553 eth_config: config.eth,
554 evm_config,
555 engine_events,
556 }
557 }
558}
559
560impl<Provider, Pool, Network, EthApi, Evm, Consensus>
561 RpcRegistryInner<Provider, Pool, Network, EthApi, Evm, Consensus>
562where
563 EthApi: EthApiTypes,
564{
565 pub const fn eth_api(&self) -> &EthApi {
567 &self.eth.api
568 }
569
570 pub const fn eth_handlers(&self) -> &EthHandlers<EthApi> {
572 &self.eth
573 }
574
575 pub const fn pool(&self) -> &Pool {
577 &self.pool
578 }
579
580 pub const fn tasks(&self) -> &(dyn TaskSpawner + 'static) {
582 &*self.executor
583 }
584
585 pub const fn provider(&self) -> &Provider {
587 &self.provider
588 }
589
590 pub const fn evm_config(&self) -> &Evm {
592 &self.evm_config
593 }
594
595 pub fn methods(&self) -> Vec<Methods> {
597 self.modules.values().cloned().collect()
598 }
599
600 pub fn module(&self) -> RpcModule<()> {
602 let mut module = RpcModule::new(());
603 for methods in self.modules.values().cloned() {
604 module.merge(methods).expect("No conflicts");
605 }
606 module
607 }
608}
609
610impl<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
611 RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
612where
613 Network: NetworkInfo + Clone + 'static,
614 EthApi: EthApiTypes,
615 Provider: BlockReader + ChainSpecProvider<ChainSpec: EthereumHardforks>,
616 EvmConfig: ConfigureEvm,
617{
618 pub fn admin_api(&self) -> AdminApi<Network, Provider::ChainSpec, Pool>
620 where
621 Network: Peers,
622 Pool: TransactionPool + Clone + 'static,
623 {
624 AdminApi::new(self.network.clone(), self.provider.chain_spec(), self.pool.clone())
625 }
626
627 pub fn web3_api(&self) -> Web3Api<Network> {
629 Web3Api::new(self.network.clone())
630 }
631
632 pub fn register_admin(&mut self) -> &mut Self
634 where
635 Network: Peers,
636 Pool: TransactionPool + Clone + 'static,
637 {
638 let adminapi = self.admin_api();
639 self.modules.insert(RethRpcModule::Admin, adminapi.into_rpc().into());
640 self
641 }
642
643 pub fn register_web3(&mut self) -> &mut Self {
645 let web3api = self.web3_api();
646 self.modules.insert(RethRpcModule::Web3, web3api.into_rpc().into());
647 self
648 }
649}
650
651impl<N, Provider, Pool, Network, EthApi, EvmConfig, Consensus>
652 RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
653where
654 N: NodePrimitives,
655 Provider: FullRpcProvider<
656 Header = N::BlockHeader,
657 Block = N::Block,
658 Receipt = N::Receipt,
659 Transaction = N::SignedTx,
660 > + AccountReader
661 + ChangeSetReader
662 + CanonStateSubscriptions<Primitives = N>
663 + ForkChoiceSubscriptions<Header = N::BlockHeader>
664 + PersistedBlockSubscriptions,
665 Network: NetworkInfo + Peers + Clone + 'static,
666 EthApi: EthApiServer<
667 RpcTxReq<EthApi::NetworkTypes>,
668 RpcTransaction<EthApi::NetworkTypes>,
669 RpcBlock<EthApi::NetworkTypes>,
670 RpcReceipt<EthApi::NetworkTypes>,
671 RpcHeader<EthApi::NetworkTypes>,
672 TxTy<N>,
673 > + EthApiTypes,
674 EvmConfig: ConfigureEvm<Primitives = N> + 'static,
675{
676 pub fn register_eth(&mut self) -> &mut Self {
682 let eth_api = self.eth_api().clone();
683 self.modules.insert(RethRpcModule::Eth, eth_api.into_rpc().into());
684 self
685 }
686
687 pub fn register_ots(&mut self) -> &mut Self
693 where
694 EthApi: TraceExt + EthTransactions<Primitives = N>,
695 {
696 let otterscan_api = self.otterscan_api();
697 self.modules.insert(RethRpcModule::Ots, otterscan_api.into_rpc().into());
698 self
699 }
700
701 pub fn register_debug(&mut self) -> &mut Self
707 where
708 EthApi: EthTransactions + TraceExt,
709 {
710 let debug_api = self.debug_api();
711 self.modules.insert(RethRpcModule::Debug, debug_api.into_rpc().into());
712 self
713 }
714
715 pub fn register_trace(&mut self) -> &mut Self
721 where
722 EthApi: TraceExt,
723 {
724 let trace_api = self.trace_api();
725 self.modules.insert(RethRpcModule::Trace, trace_api.into_rpc().into());
726 self
727 }
728
729 pub fn register_net(&mut self) -> &mut Self
737 where
738 EthApi: EthApiSpec + 'static,
739 {
740 let netapi = self.net_api();
741 self.modules.insert(RethRpcModule::Net, netapi.into_rpc().into());
742 self
743 }
744
745 pub fn register_reth(&mut self) -> &mut Self {
753 let rethapi = self.reth_api();
754 self.modules.insert(RethRpcModule::Reth, rethapi.into_rpc().into());
755 self
756 }
757
758 pub fn otterscan_api(&self) -> OtterscanApi<EthApi> {
764 let eth_api = self.eth_api().clone();
765 OtterscanApi::new(eth_api)
766 }
767}
768
769impl<N, Provider, Pool, Network, EthApi, EvmConfig, Consensus>
770 RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
771where
772 N: NodePrimitives,
773 Provider: FullRpcProvider<
774 Block = N::Block,
775 Header = N::BlockHeader,
776 Transaction = N::SignedTx,
777 Receipt = N::Receipt,
778 > + AccountReader
779 + ChangeSetReader,
780 Network: NetworkInfo + Peers + Clone + 'static,
781 EthApi: EthApiTypes,
782 EvmConfig: ConfigureEvm<Primitives = N>,
783{
784 pub fn trace_api(&self) -> TraceApi<EthApi> {
790 TraceApi::new(
791 self.eth_api().clone(),
792 self.blocking_pool_guard.clone(),
793 self.eth_config.clone(),
794 )
795 }
796
797 pub fn bundle_api(&self) -> EthBundle<EthApi>
803 where
804 EthApi: EthTransactions + LoadPendingBlock + Call,
805 {
806 let eth_api = self.eth_api().clone();
807 EthBundle::new(eth_api, self.blocking_pool_guard.clone())
808 }
809
810 pub fn debug_api(&self) -> DebugApi<EthApi>
816 where
817 EthApi: FullEthApiTypes,
818 {
819 DebugApi::new(
820 self.eth_api().clone(),
821 self.blocking_pool_guard.clone(),
822 self.tasks(),
823 self.engine_events.new_listener(),
824 )
825 }
826
827 pub fn net_api(&self) -> NetApi<Network, EthApi>
833 where
834 EthApi: EthApiSpec + 'static,
835 {
836 let eth_api = self.eth_api().clone();
837 NetApi::new(self.network.clone(), eth_api)
838 }
839
840 pub fn reth_api(&self) -> RethApi<Provider> {
842 RethApi::new(self.provider.clone(), self.executor.clone())
843 }
844}
845
846impl<N, Provider, Pool, Network, EthApi, EvmConfig, Consensus>
847 RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
848where
849 N: NodePrimitives,
850 Provider: FullRpcProvider<Block = N::Block>
851 + CanonStateSubscriptions<Primitives = N>
852 + ForkChoiceSubscriptions<Header = N::BlockHeader>
853 + PersistedBlockSubscriptions
854 + AccountReader
855 + ChangeSetReader,
856 Pool: TransactionPool + Clone + 'static,
857 Network: NetworkInfo + Peers + Clone + 'static,
858 EthApi: FullEthApiServer,
859 EvmConfig: ConfigureEvm<Primitives = N> + 'static,
860 Consensus: FullConsensus<N> + Clone + 'static,
861{
862 pub fn create_auth_module(&self, engine_api: impl IntoEngineApiRpcModule) -> AuthRpcModule {
868 let mut module = engine_api.into_rpc_module();
869
870 let eth_handlers = self.eth_handlers();
872 let engine_eth = EngineEthApi::new(eth_handlers.api.clone(), eth_handlers.filter.clone());
873
874 module.merge(engine_eth.into_rpc()).expect("No conflicting methods");
875
876 AuthRpcModule { inner: module }
877 }
878
879 fn maybe_module(&mut self, config: Option<&RpcModuleSelection>) -> Option<RpcModule<()>> {
881 config.map(|config| self.module_for(config))
882 }
883
884 pub fn create_transport_rpc_modules(
888 &mut self,
889 config: TransportRpcModuleConfig,
890 ) -> TransportRpcModules<()> {
891 let mut modules = TransportRpcModules::default();
892 let http = self.maybe_module(config.http.as_ref());
893 let ws = self.maybe_module(config.ws.as_ref());
894 let ipc = self.maybe_module(config.ipc.as_ref());
895
896 modules.config = config;
897 modules.http = http;
898 modules.ws = ws;
899 modules.ipc = ipc;
900 modules
901 }
902
903 pub fn module_for(&mut self, config: &RpcModuleSelection) -> RpcModule<()> {
906 let mut module = RpcModule::new(());
907 let all_methods = self.reth_methods(config.iter_selection());
908 for methods in all_methods {
909 module.merge(methods).expect("No conflicts");
910 }
911 module
912 }
913
914 pub fn reth_methods(
923 &mut self,
924 namespaces: impl Iterator<Item = RethRpcModule>,
925 ) -> Vec<Methods> {
926 let EthHandlers { api: eth_api, filter: eth_filter, pubsub: eth_pubsub, .. } =
927 self.eth_handlers().clone();
928
929 let namespaces: Vec<_> = namespaces.collect();
931 namespaces
932 .iter()
933 .map(|namespace| {
934 self.modules
935 .entry(namespace.clone())
936 .or_insert_with(|| match namespace.clone() {
937 RethRpcModule::Admin => AdminApi::new(
938 self.network.clone(),
939 self.provider.chain_spec(),
940 self.pool.clone(),
941 )
942 .into_rpc()
943 .into(),
944 RethRpcModule::Debug => DebugApi::new(
945 eth_api.clone(),
946 self.blocking_pool_guard.clone(),
947 &*self.executor,
948 self.engine_events.new_listener(),
949 )
950 .into_rpc()
951 .into(),
952 RethRpcModule::Eth => {
953 let mut module = eth_api.clone().into_rpc();
955 module.merge(eth_filter.clone().into_rpc()).expect("No conflicts");
956 module.merge(eth_pubsub.clone().into_rpc()).expect("No conflicts");
957 module
958 .merge(
959 EthBundle::new(
960 eth_api.clone(),
961 self.blocking_pool_guard.clone(),
962 )
963 .into_rpc(),
964 )
965 .expect("No conflicts");
966
967 module.into()
968 }
969 RethRpcModule::Net => {
970 NetApi::new(self.network.clone(), eth_api.clone()).into_rpc().into()
971 }
972 RethRpcModule::Trace => TraceApi::new(
973 eth_api.clone(),
974 self.blocking_pool_guard.clone(),
975 self.eth_config.clone(),
976 )
977 .into_rpc()
978 .into(),
979 RethRpcModule::Web3 => Web3Api::new(self.network.clone()).into_rpc().into(),
980 RethRpcModule::Txpool => TxPoolApi::new(
981 self.eth.api.pool().clone(),
982 dyn_clone::clone(self.eth.api.converter()),
983 )
984 .into_rpc()
985 .into(),
986 RethRpcModule::Rpc => RPCApi::new(
987 namespaces
988 .iter()
989 .map(|module| (module.to_string(), "1.0".to_string()))
990 .collect(),
991 )
992 .into_rpc()
993 .into(),
994 RethRpcModule::Ots => OtterscanApi::new(eth_api.clone()).into_rpc().into(),
995 RethRpcModule::Reth => {
996 RethApi::new(self.provider.clone(), self.executor.clone())
997 .into_rpc()
998 .into()
999 }
1000 RethRpcModule::Miner => MinerApi::default().into_rpc().into(),
1001 RethRpcModule::Mev => {
1002 EthSimBundle::new(eth_api.clone(), self.blocking_pool_guard.clone())
1003 .into_rpc()
1004 .into()
1005 }
1006 RethRpcModule::Flashbots |
1010 RethRpcModule::Testing |
1011 RethRpcModule::Other(_) => Default::default(),
1012 })
1013 .clone()
1014 })
1015 .collect::<Vec<_>>()
1016 }
1017}
1018
1019impl<Provider, Pool, Network, EthApi, EvmConfig, Consensus> Clone
1020 for RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
1021where
1022 EthApi: EthApiTypes,
1023 Provider: Clone,
1024 Pool: Clone,
1025 Network: Clone,
1026 EvmConfig: Clone,
1027 Consensus: Clone,
1028{
1029 fn clone(&self) -> Self {
1030 Self {
1031 provider: self.provider.clone(),
1032 pool: self.pool.clone(),
1033 network: self.network.clone(),
1034 executor: self.executor.clone(),
1035 evm_config: self.evm_config.clone(),
1036 consensus: self.consensus.clone(),
1037 eth: self.eth.clone(),
1038 blocking_pool_guard: self.blocking_pool_guard.clone(),
1039 modules: self.modules.clone(),
1040 eth_config: self.eth_config.clone(),
1041 engine_events: self.engine_events.clone(),
1042 }
1043 }
1044}
1045
1046#[derive(Debug)]
1058pub struct RpcServerConfig<RpcMiddleware = Identity> {
1059 http_server_config: Option<ServerConfigBuilder>,
1061 http_cors_domains: Option<String>,
1063 http_addr: Option<SocketAddr>,
1065 http_disable_compression: bool,
1067 ws_server_config: Option<ServerConfigBuilder>,
1069 ws_cors_domains: Option<String>,
1071 ws_addr: Option<SocketAddr>,
1073 ipc_server_config: Option<IpcServerBuilder<Identity, Identity>>,
1075 ipc_endpoint: Option<String>,
1077 jwt_secret: Option<JwtSecret>,
1079 rpc_middleware: RpcMiddleware,
1081}
1082
1083impl Default for RpcServerConfig<Identity> {
1086 fn default() -> Self {
1088 Self {
1089 http_server_config: None,
1090 http_cors_domains: None,
1091 http_addr: None,
1092 http_disable_compression: false,
1093 ws_server_config: None,
1094 ws_cors_domains: None,
1095 ws_addr: None,
1096 ipc_server_config: None,
1097 ipc_endpoint: None,
1098 jwt_secret: None,
1099 rpc_middleware: Default::default(),
1100 }
1101 }
1102}
1103
1104impl RpcServerConfig {
1105 pub fn http(config: ServerConfigBuilder) -> Self {
1107 Self::default().with_http(config)
1108 }
1109
1110 pub fn ws(config: ServerConfigBuilder) -> Self {
1112 Self::default().with_ws(config)
1113 }
1114
1115 pub fn ipc(config: IpcServerBuilder<Identity, Identity>) -> Self {
1117 Self::default().with_ipc(config)
1118 }
1119
1120 pub fn with_http(mut self, config: ServerConfigBuilder) -> Self {
1125 self.http_server_config =
1126 Some(config.set_id_provider(EthSubscriptionIdProvider::default()));
1127 self
1128 }
1129
1130 pub fn with_ws(mut self, config: ServerConfigBuilder) -> Self {
1135 self.ws_server_config = Some(config.set_id_provider(EthSubscriptionIdProvider::default()));
1136 self
1137 }
1138
1139 pub fn with_ipc(mut self, config: IpcServerBuilder<Identity, Identity>) -> Self {
1144 self.ipc_server_config = Some(config.set_id_provider(EthSubscriptionIdProvider::default()));
1145 self
1146 }
1147}
1148
1149impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
1150 pub fn set_rpc_middleware<T>(self, rpc_middleware: T) -> RpcServerConfig<T> {
1152 RpcServerConfig {
1153 http_server_config: self.http_server_config,
1154 http_cors_domains: self.http_cors_domains,
1155 http_addr: self.http_addr,
1156 http_disable_compression: self.http_disable_compression,
1157 ws_server_config: self.ws_server_config,
1158 ws_cors_domains: self.ws_cors_domains,
1159 ws_addr: self.ws_addr,
1160 ipc_server_config: self.ipc_server_config,
1161 ipc_endpoint: self.ipc_endpoint,
1162 jwt_secret: self.jwt_secret,
1163 rpc_middleware,
1164 }
1165 }
1166
1167 pub fn with_cors(self, cors_domain: Option<String>) -> Self {
1169 self.with_http_cors(cors_domain.clone()).with_ws_cors(cors_domain)
1170 }
1171
1172 pub fn with_ws_cors(mut self, cors_domain: Option<String>) -> Self {
1174 self.ws_cors_domains = cors_domain;
1175 self
1176 }
1177
1178 pub const fn with_http_disable_compression(mut self, http_disable_compression: bool) -> Self {
1180 self.http_disable_compression = http_disable_compression;
1181 self
1182 }
1183
1184 pub fn with_http_cors(mut self, cors_domain: Option<String>) -> Self {
1186 self.http_cors_domains = cors_domain;
1187 self
1188 }
1189
1190 pub const fn with_http_address(mut self, addr: SocketAddr) -> Self {
1195 self.http_addr = Some(addr);
1196 self
1197 }
1198
1199 pub const fn with_ws_address(mut self, addr: SocketAddr) -> Self {
1204 self.ws_addr = Some(addr);
1205 self
1206 }
1207
1208 pub fn with_id_provider<I>(mut self, id_provider: I) -> Self
1212 where
1213 I: IdProvider + Clone + 'static,
1214 {
1215 if let Some(config) = self.http_server_config {
1216 self.http_server_config = Some(config.set_id_provider(id_provider.clone()));
1217 }
1218 if let Some(config) = self.ws_server_config {
1219 self.ws_server_config = Some(config.set_id_provider(id_provider.clone()));
1220 }
1221 if let Some(ipc) = self.ipc_server_config {
1222 self.ipc_server_config = Some(ipc.set_id_provider(id_provider));
1223 }
1224
1225 self
1226 }
1227
1228 pub fn with_ipc_endpoint(mut self, path: impl Into<String>) -> Self {
1232 self.ipc_endpoint = Some(path.into());
1233 self
1234 }
1235
1236 pub const fn with_jwt_secret(mut self, secret: Option<JwtSecret>) -> Self {
1238 self.jwt_secret = secret;
1239 self
1240 }
1241
1242 pub fn with_tokio_runtime(mut self, tokio_runtime: Option<tokio::runtime::Handle>) -> Self {
1244 let Some(tokio_runtime) = tokio_runtime else { return self };
1245 if let Some(http_server_config) = self.http_server_config {
1246 self.http_server_config =
1247 Some(http_server_config.custom_tokio_runtime(tokio_runtime.clone()));
1248 }
1249 if let Some(ws_server_config) = self.ws_server_config {
1250 self.ws_server_config =
1251 Some(ws_server_config.custom_tokio_runtime(tokio_runtime.clone()));
1252 }
1253 if let Some(ipc_server_config) = self.ipc_server_config {
1254 self.ipc_server_config = Some(ipc_server_config.custom_tokio_runtime(tokio_runtime));
1255 }
1256 self
1257 }
1258
1259 pub const fn has_server(&self) -> bool {
1263 self.http_server_config.is_some() ||
1264 self.ws_server_config.is_some() ||
1265 self.ipc_server_config.is_some()
1266 }
1267
1268 pub const fn http_address(&self) -> Option<SocketAddr> {
1270 self.http_addr
1271 }
1272
1273 pub const fn ws_address(&self) -> Option<SocketAddr> {
1275 self.ws_addr
1276 }
1277
1278 pub fn ipc_endpoint(&self) -> Option<String> {
1280 self.ipc_endpoint.clone()
1281 }
1282
1283 fn maybe_cors_layer(cors: Option<String>) -> Result<Option<CorsLayer>, CorsDomainError> {
1285 cors.as_deref().map(cors::create_cors_layer).transpose()
1286 }
1287
1288 fn maybe_jwt_layer(jwt_secret: Option<JwtSecret>) -> Option<AuthLayer<JwtAuthValidator>> {
1290 jwt_secret.map(|secret| AuthLayer::new(JwtAuthValidator::new(secret)))
1291 }
1292
1293 fn maybe_compression_layer(disable_compression: bool) -> Option<CompressionLayer> {
1296 if disable_compression {
1297 None
1298 } else {
1299 Some(CompressionLayer::new())
1300 }
1301 }
1302
1303 pub async fn start(self, modules: &TransportRpcModules) -> Result<RpcServerHandle, RpcError>
1309 where
1310 RpcMiddleware: RethRpcMiddleware,
1311 {
1312 let mut http_handle = None;
1313 let mut ws_handle = None;
1314 let mut ipc_handle = None;
1315
1316 let http_socket_addr = self.http_addr.unwrap_or(SocketAddr::V4(SocketAddrV4::new(
1317 Ipv4Addr::LOCALHOST,
1318 constants::DEFAULT_HTTP_RPC_PORT,
1319 )));
1320
1321 let ws_socket_addr = self.ws_addr.unwrap_or(SocketAddr::V4(SocketAddrV4::new(
1322 Ipv4Addr::LOCALHOST,
1323 constants::DEFAULT_WS_RPC_PORT,
1324 )));
1325
1326 let metrics = modules.ipc.as_ref().map(RpcRequestMetrics::ipc).unwrap_or_default();
1327 let ipc_path =
1328 self.ipc_endpoint.clone().unwrap_or_else(|| constants::DEFAULT_IPC_ENDPOINT.into());
1329
1330 if let Some(builder) = self.ipc_server_config {
1331 let ipc = builder
1332 .set_rpc_middleware(IpcRpcServiceBuilder::new().layer(metrics))
1333 .build(ipc_path);
1334 ipc_handle = Some(ipc.start(modules.ipc.clone().expect("ipc server error")).await?);
1335 }
1336
1337 if self.http_addr == self.ws_addr &&
1339 self.http_server_config.is_some() &&
1340 self.ws_server_config.is_some()
1341 {
1342 let cors = match (self.ws_cors_domains.as_ref(), self.http_cors_domains.as_ref()) {
1343 (Some(ws_cors), Some(http_cors)) => {
1344 if ws_cors.trim() != http_cors.trim() {
1345 return Err(WsHttpSamePortError::ConflictingCorsDomains {
1346 http_cors_domains: Some(http_cors.clone()),
1347 ws_cors_domains: Some(ws_cors.clone()),
1348 }
1349 .into());
1350 }
1351 Some(ws_cors)
1352 }
1353 (a, b) => a.or(b),
1354 }
1355 .cloned();
1356
1357 modules.config.ensure_ws_http_identical()?;
1359
1360 if let Some(config) = self.http_server_config {
1361 let server = ServerBuilder::new()
1362 .set_http_middleware(
1363 tower::ServiceBuilder::new()
1364 .option_layer(Self::maybe_cors_layer(cors)?)
1365 .option_layer(Self::maybe_jwt_layer(self.jwt_secret))
1366 .option_layer(Self::maybe_compression_layer(
1367 self.http_disable_compression,
1368 )),
1369 )
1370 .set_rpc_middleware(
1371 RpcServiceBuilder::default()
1372 .layer(
1373 modules
1374 .http
1375 .as_ref()
1376 .or(modules.ws.as_ref())
1377 .map(RpcRequestMetrics::same_port)
1378 .unwrap_or_default(),
1379 )
1380 .layer(self.rpc_middleware.clone()),
1381 )
1382 .set_config(config.build())
1383 .build(http_socket_addr)
1384 .await
1385 .map_err(|err| {
1386 RpcError::server_error(err, ServerKind::WsHttp(http_socket_addr))
1387 })?;
1388 let addr = server.local_addr().map_err(|err| {
1389 RpcError::server_error(err, ServerKind::WsHttp(http_socket_addr))
1390 })?;
1391 if let Some(module) = modules.http.as_ref().or(modules.ws.as_ref()) {
1392 let handle = server.start(module.clone());
1393 http_handle = Some(handle.clone());
1394 ws_handle = Some(handle);
1395 }
1396 return Ok(RpcServerHandle {
1397 http_local_addr: Some(addr),
1398 ws_local_addr: Some(addr),
1399 http: http_handle,
1400 ws: ws_handle,
1401 ipc_endpoint: self.ipc_endpoint.clone(),
1402 ipc: ipc_handle,
1403 jwt_secret: self.jwt_secret,
1404 });
1405 }
1406 }
1407
1408 let mut ws_local_addr = None;
1409 let mut ws_server = None;
1410 let mut http_local_addr = None;
1411 let mut http_server = None;
1412
1413 if let Some(config) = self.ws_server_config {
1414 let server = ServerBuilder::new()
1415 .set_config(config.ws_only().build())
1416 .set_http_middleware(
1417 tower::ServiceBuilder::new()
1418 .option_layer(Self::maybe_cors_layer(self.ws_cors_domains.clone())?)
1419 .option_layer(Self::maybe_jwt_layer(self.jwt_secret)),
1420 )
1421 .set_rpc_middleware(
1422 RpcServiceBuilder::default()
1423 .layer(modules.ws.as_ref().map(RpcRequestMetrics::ws).unwrap_or_default())
1424 .layer(self.rpc_middleware.clone()),
1425 )
1426 .build(ws_socket_addr)
1427 .await
1428 .map_err(|err| RpcError::server_error(err, ServerKind::WS(ws_socket_addr)))?;
1429
1430 let addr = server
1431 .local_addr()
1432 .map_err(|err| RpcError::server_error(err, ServerKind::WS(ws_socket_addr)))?;
1433
1434 ws_local_addr = Some(addr);
1435 ws_server = Some(server);
1436 }
1437
1438 if let Some(config) = self.http_server_config {
1439 let server = ServerBuilder::new()
1440 .set_config(config.http_only().build())
1441 .set_http_middleware(
1442 tower::ServiceBuilder::new()
1443 .option_layer(Self::maybe_cors_layer(self.http_cors_domains.clone())?)
1444 .option_layer(Self::maybe_jwt_layer(self.jwt_secret))
1445 .option_layer(Self::maybe_compression_layer(self.http_disable_compression)),
1446 )
1447 .set_rpc_middleware(
1448 RpcServiceBuilder::default()
1449 .layer(
1450 modules.http.as_ref().map(RpcRequestMetrics::http).unwrap_or_default(),
1451 )
1452 .layer(self.rpc_middleware.clone()),
1453 )
1454 .build(http_socket_addr)
1455 .await
1456 .map_err(|err| RpcError::server_error(err, ServerKind::Http(http_socket_addr)))?;
1457 let local_addr = server
1458 .local_addr()
1459 .map_err(|err| RpcError::server_error(err, ServerKind::Http(http_socket_addr)))?;
1460 http_local_addr = Some(local_addr);
1461 http_server = Some(server);
1462 }
1463
1464 http_handle = http_server
1465 .map(|http_server| http_server.start(modules.http.clone().expect("http server error")));
1466 ws_handle = ws_server
1467 .map(|ws_server| ws_server.start(modules.ws.clone().expect("ws server error")));
1468 Ok(RpcServerHandle {
1469 http_local_addr,
1470 ws_local_addr,
1471 http: http_handle,
1472 ws: ws_handle,
1473 ipc_endpoint: self.ipc_endpoint.clone(),
1474 ipc: ipc_handle,
1475 jwt_secret: self.jwt_secret,
1476 })
1477 }
1478}
1479
1480#[derive(Debug, Clone, Default, Eq, PartialEq)]
1492pub struct TransportRpcModuleConfig {
1493 http: Option<RpcModuleSelection>,
1495 ws: Option<RpcModuleSelection>,
1497 ipc: Option<RpcModuleSelection>,
1499 config: Option<RpcModuleConfig>,
1501}
1502
1503impl TransportRpcModuleConfig {
1506 pub fn set_http(http: impl Into<RpcModuleSelection>) -> Self {
1508 Self::default().with_http(http)
1509 }
1510
1511 pub fn set_ws(ws: impl Into<RpcModuleSelection>) -> Self {
1513 Self::default().with_ws(ws)
1514 }
1515
1516 pub fn set_ipc(ipc: impl Into<RpcModuleSelection>) -> Self {
1518 Self::default().with_ipc(ipc)
1519 }
1520
1521 pub fn with_http(mut self, http: impl Into<RpcModuleSelection>) -> Self {
1523 self.http = Some(http.into());
1524 self
1525 }
1526
1527 pub fn with_ws(mut self, ws: impl Into<RpcModuleSelection>) -> Self {
1529 self.ws = Some(ws.into());
1530 self
1531 }
1532
1533 pub fn with_ipc(mut self, ipc: impl Into<RpcModuleSelection>) -> Self {
1535 self.ipc = Some(ipc.into());
1536 self
1537 }
1538
1539 pub fn with_config(mut self, config: RpcModuleConfig) -> Self {
1541 self.config = Some(config);
1542 self
1543 }
1544
1545 pub const fn http_mut(&mut self) -> &mut Option<RpcModuleSelection> {
1547 &mut self.http
1548 }
1549
1550 pub const fn ws_mut(&mut self) -> &mut Option<RpcModuleSelection> {
1552 &mut self.ws
1553 }
1554
1555 pub const fn ipc_mut(&mut self) -> &mut Option<RpcModuleSelection> {
1557 &mut self.ipc
1558 }
1559
1560 pub const fn config_mut(&mut self) -> &mut Option<RpcModuleConfig> {
1562 &mut self.config
1563 }
1564
1565 pub const fn is_empty(&self) -> bool {
1567 self.http.is_none() && self.ws.is_none() && self.ipc.is_none()
1568 }
1569
1570 pub const fn http(&self) -> Option<&RpcModuleSelection> {
1572 self.http.as_ref()
1573 }
1574
1575 pub const fn ws(&self) -> Option<&RpcModuleSelection> {
1577 self.ws.as_ref()
1578 }
1579
1580 pub const fn ipc(&self) -> Option<&RpcModuleSelection> {
1582 self.ipc.as_ref()
1583 }
1584
1585 pub const fn config(&self) -> Option<&RpcModuleConfig> {
1587 self.config.as_ref()
1588 }
1589
1590 pub fn contains_any(&self, module: &RethRpcModule) -> bool {
1592 self.contains_http(module) || self.contains_ws(module) || self.contains_ipc(module)
1593 }
1594
1595 pub fn contains_http(&self, module: &RethRpcModule) -> bool {
1597 self.http.as_ref().is_some_and(|http| http.contains(module))
1598 }
1599
1600 pub fn contains_ws(&self, module: &RethRpcModule) -> bool {
1602 self.ws.as_ref().is_some_and(|ws| ws.contains(module))
1603 }
1604
1605 pub fn contains_ipc(&self, module: &RethRpcModule) -> bool {
1607 self.ipc.as_ref().is_some_and(|ipc| ipc.contains(module))
1608 }
1609
1610 fn ensure_ws_http_identical(&self) -> Result<(), WsHttpSamePortError> {
1613 if RpcModuleSelection::are_identical(self.http.as_ref(), self.ws.as_ref()) {
1614 Ok(())
1615 } else {
1616 let http_modules =
1617 self.http.as_ref().map(RpcModuleSelection::to_selection).unwrap_or_default();
1618 let ws_modules =
1619 self.ws.as_ref().map(RpcModuleSelection::to_selection).unwrap_or_default();
1620
1621 let http_not_ws = http_modules.difference(&ws_modules).cloned().collect();
1622 let ws_not_http = ws_modules.difference(&http_modules).cloned().collect();
1623 let overlap = http_modules.intersection(&ws_modules).cloned().collect();
1624
1625 Err(WsHttpSamePortError::ConflictingModules(Box::new(ConflictingModules {
1626 overlap,
1627 http_not_ws,
1628 ws_not_http,
1629 })))
1630 }
1631 }
1632}
1633
1634#[derive(Debug, Clone, Default)]
1636pub struct TransportRpcModules<Context = ()> {
1637 config: TransportRpcModuleConfig,
1639 http: Option<RpcModule<Context>>,
1641 ws: Option<RpcModule<Context>>,
1643 ipc: Option<RpcModule<Context>>,
1645}
1646
1647impl TransportRpcModules {
1650 pub fn with_config(mut self, config: TransportRpcModuleConfig) -> Self {
1653 self.config = config;
1654 self
1655 }
1656
1657 pub fn with_http(mut self, http: RpcModule<()>) -> Self {
1660 self.http = Some(http);
1661 self
1662 }
1663
1664 pub fn with_ws(mut self, ws: RpcModule<()>) -> Self {
1667 self.ws = Some(ws);
1668 self
1669 }
1670
1671 pub fn with_ipc(mut self, ipc: RpcModule<()>) -> Self {
1674 self.ipc = Some(ipc);
1675 self
1676 }
1677
1678 pub const fn module_config(&self) -> &TransportRpcModuleConfig {
1680 &self.config
1681 }
1682
1683 pub fn merge_if_module_configured(
1688 &mut self,
1689 module: RethRpcModule,
1690 other: impl Into<Methods>,
1691 ) -> Result<(), RegisterMethodError> {
1692 let other = other.into();
1693 if self.module_config().contains_http(&module) {
1694 self.merge_http(other.clone())?;
1695 }
1696 if self.module_config().contains_ws(&module) {
1697 self.merge_ws(other.clone())?;
1698 }
1699 if self.module_config().contains_ipc(&module) {
1700 self.merge_ipc(other)?;
1701 }
1702
1703 Ok(())
1704 }
1705
1706 pub fn merge_if_module_configured_with<F>(
1713 &mut self,
1714 module: RethRpcModule,
1715 f: F,
1716 ) -> Result<(), RegisterMethodError>
1717 where
1718 F: FnOnce() -> Methods,
1719 {
1720 if !self.module_config().contains_any(&module) {
1722 return Ok(());
1723 }
1724 self.merge_if_module_configured(module, f())
1725 }
1726
1727 pub fn merge_http(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
1733 if let Some(ref mut http) = self.http {
1734 return http.merge(other.into()).map(|_| true)
1735 }
1736 Ok(false)
1737 }
1738
1739 pub fn merge_ws(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
1745 if let Some(ref mut ws) = self.ws {
1746 return ws.merge(other.into()).map(|_| true)
1747 }
1748 Ok(false)
1749 }
1750
1751 pub fn merge_ipc(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
1757 if let Some(ref mut ipc) = self.ipc {
1758 return ipc.merge(other.into()).map(|_| true)
1759 }
1760 Ok(false)
1761 }
1762
1763 pub fn merge_configured(
1767 &mut self,
1768 other: impl Into<Methods>,
1769 ) -> Result<(), RegisterMethodError> {
1770 let other = other.into();
1771 self.merge_http(other.clone())?;
1772 self.merge_ws(other.clone())?;
1773 self.merge_ipc(other)?;
1774 Ok(())
1775 }
1776
1777 pub fn methods_by_module(&self, module: RethRpcModule) -> Methods {
1781 self.methods_by(|name| name.starts_with(module.as_str()))
1782 }
1783
1784 pub fn methods_by<F>(&self, mut filter: F) -> Methods
1788 where
1789 F: FnMut(&str) -> bool,
1790 {
1791 let mut methods = Methods::new();
1792
1793 let mut f =
1795 |name: &str, mm: &Methods| filter(name) && !mm.method_names().any(|m| m == name);
1796
1797 if let Some(m) = self.http_methods(|name| f(name, &methods)) {
1798 let _ = methods.merge(m);
1799 }
1800 if let Some(m) = self.ws_methods(|name| f(name, &methods)) {
1801 let _ = methods.merge(m);
1802 }
1803 if let Some(m) = self.ipc_methods(|name| f(name, &methods)) {
1804 let _ = methods.merge(m);
1805 }
1806 methods
1807 }
1808
1809 pub fn http_methods<F>(&self, filter: F) -> Option<Methods>
1813 where
1814 F: FnMut(&str) -> bool,
1815 {
1816 self.http.as_ref().map(|module| methods_by(module, filter))
1817 }
1818
1819 pub fn ws_methods<F>(&self, filter: F) -> Option<Methods>
1823 where
1824 F: FnMut(&str) -> bool,
1825 {
1826 self.ws.as_ref().map(|module| methods_by(module, filter))
1827 }
1828
1829 pub fn ipc_methods<F>(&self, filter: F) -> Option<Methods>
1833 where
1834 F: FnMut(&str) -> bool,
1835 {
1836 self.ipc.as_ref().map(|module| methods_by(module, filter))
1837 }
1838
1839 pub fn remove_http_method(&mut self, method_name: &'static str) -> bool {
1847 if let Some(http_module) = &mut self.http {
1848 http_module.remove_method(method_name).is_some()
1849 } else {
1850 false
1851 }
1852 }
1853
1854 pub fn remove_http_methods(&mut self, methods: impl IntoIterator<Item = &'static str>) {
1856 for name in methods {
1857 self.remove_http_method(name);
1858 }
1859 }
1860
1861 pub fn remove_ws_method(&mut self, method_name: &'static str) -> bool {
1869 if let Some(ws_module) = &mut self.ws {
1870 ws_module.remove_method(method_name).is_some()
1871 } else {
1872 false
1873 }
1874 }
1875
1876 pub fn remove_ws_methods(&mut self, methods: impl IntoIterator<Item = &'static str>) {
1878 for name in methods {
1879 self.remove_ws_method(name);
1880 }
1881 }
1882
1883 pub fn remove_ipc_method(&mut self, method_name: &'static str) -> bool {
1891 if let Some(ipc_module) = &mut self.ipc {
1892 ipc_module.remove_method(method_name).is_some()
1893 } else {
1894 false
1895 }
1896 }
1897
1898 pub fn remove_ipc_methods(&mut self, methods: impl IntoIterator<Item = &'static str>) {
1900 for name in methods {
1901 self.remove_ipc_method(name);
1902 }
1903 }
1904
1905 pub fn remove_method_from_configured(&mut self, method_name: &'static str) -> bool {
1909 let http_removed = self.remove_http_method(method_name);
1910 let ws_removed = self.remove_ws_method(method_name);
1911 let ipc_removed = self.remove_ipc_method(method_name);
1912
1913 http_removed || ws_removed || ipc_removed
1914 }
1915
1916 pub fn rename(
1920 &mut self,
1921 old_name: &'static str,
1922 new_method: impl Into<Methods>,
1923 ) -> Result<(), RegisterMethodError> {
1924 self.remove_method_from_configured(old_name);
1926
1927 self.merge_configured(new_method)
1929 }
1930
1931 pub fn replace_http(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
1938 let other = other.into();
1939 self.remove_http_methods(other.method_names());
1940 self.merge_http(other)
1941 }
1942
1943 pub fn replace_ipc(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
1950 let other = other.into();
1951 self.remove_ipc_methods(other.method_names());
1952 self.merge_ipc(other)
1953 }
1954
1955 pub fn replace_ws(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
1962 let other = other.into();
1963 self.remove_ws_methods(other.method_names());
1964 self.merge_ws(other)
1965 }
1966
1967 pub fn replace_configured(
1971 &mut self,
1972 other: impl Into<Methods>,
1973 ) -> Result<bool, RegisterMethodError> {
1974 let other = other.into();
1975 self.replace_http(other.clone())?;
1976 self.replace_ws(other.clone())?;
1977 self.replace_ipc(other)?;
1978 Ok(true)
1979 }
1980
1981 pub fn add_or_replace_http(
1985 &mut self,
1986 other: impl Into<Methods>,
1987 ) -> Result<bool, RegisterMethodError> {
1988 let other = other.into();
1989 self.remove_http_methods(other.method_names());
1990 self.merge_http(other)
1991 }
1992
1993 pub fn add_or_replace_ws(
1997 &mut self,
1998 other: impl Into<Methods>,
1999 ) -> Result<bool, RegisterMethodError> {
2000 let other = other.into();
2001 self.remove_ws_methods(other.method_names());
2002 self.merge_ws(other)
2003 }
2004
2005 pub fn add_or_replace_ipc(
2009 &mut self,
2010 other: impl Into<Methods>,
2011 ) -> Result<bool, RegisterMethodError> {
2012 let other = other.into();
2013 self.remove_ipc_methods(other.method_names());
2014 self.merge_ipc(other)
2015 }
2016
2017 pub fn add_or_replace_configured(
2019 &mut self,
2020 other: impl Into<Methods>,
2021 ) -> Result<(), RegisterMethodError> {
2022 let other = other.into();
2023 self.add_or_replace_http(other.clone())?;
2024 self.add_or_replace_ws(other.clone())?;
2025 self.add_or_replace_ipc(other)?;
2026 Ok(())
2027 }
2028 pub fn add_or_replace_if_module_configured(
2031 &mut self,
2032 module: RethRpcModule,
2033 other: impl Into<Methods>,
2034 ) -> Result<(), RegisterMethodError> {
2035 let other = other.into();
2036 if self.module_config().contains_http(&module) {
2037 self.add_or_replace_http(other.clone())?;
2038 }
2039 if self.module_config().contains_ws(&module) {
2040 self.add_or_replace_ws(other.clone())?;
2041 }
2042 if self.module_config().contains_ipc(&module) {
2043 self.add_or_replace_ipc(other)?;
2044 }
2045 Ok(())
2046 }
2047}
2048
2049fn methods_by<T, F>(module: &RpcModule<T>, mut filter: F) -> Methods
2051where
2052 F: FnMut(&str) -> bool,
2053{
2054 let mut methods = Methods::new();
2055 let method_names = module.method_names().filter(|name| filter(name));
2056
2057 for name in method_names {
2058 if let Some(matched_method) = module.method(name).cloned() {
2059 let _ = methods.verify_and_insert(name, matched_method);
2060 }
2061 }
2062
2063 methods
2064}
2065
2066#[derive(Clone, Debug)]
2071#[must_use = "Server stops if dropped"]
2072pub struct RpcServerHandle {
2073 http_local_addr: Option<SocketAddr>,
2075 ws_local_addr: Option<SocketAddr>,
2076 http: Option<ServerHandle>,
2077 ws: Option<ServerHandle>,
2078 ipc_endpoint: Option<String>,
2079 ipc: Option<jsonrpsee::server::ServerHandle>,
2080 jwt_secret: Option<JwtSecret>,
2081}
2082
2083impl RpcServerHandle {
2086 fn bearer_token(&self) -> Option<String> {
2088 self.jwt_secret.as_ref().map(|secret| {
2089 format!(
2090 "Bearer {}",
2091 secret
2092 .encode(&Claims {
2093 iat: (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() +
2094 Duration::from_secs(60))
2095 .as_secs(),
2096 exp: None,
2097 })
2098 .unwrap()
2099 )
2100 })
2101 }
2102 pub const fn http_local_addr(&self) -> Option<SocketAddr> {
2104 self.http_local_addr
2105 }
2106
2107 pub const fn ws_local_addr(&self) -> Option<SocketAddr> {
2109 self.ws_local_addr
2110 }
2111
2112 pub fn stop(self) -> Result<(), AlreadyStoppedError> {
2114 if let Some(handle) = self.http {
2115 handle.stop()?
2116 }
2117
2118 if let Some(handle) = self.ws {
2119 handle.stop()?
2120 }
2121
2122 if let Some(handle) = self.ipc {
2123 handle.stop()?
2124 }
2125
2126 Ok(())
2127 }
2128
2129 pub fn ipc_endpoint(&self) -> Option<String> {
2131 self.ipc_endpoint.clone()
2132 }
2133
2134 pub fn http_url(&self) -> Option<String> {
2136 self.http_local_addr.map(|addr| format!("http://{addr}"))
2137 }
2138
2139 pub fn ws_url(&self) -> Option<String> {
2141 self.ws_local_addr.map(|addr| format!("ws://{addr}"))
2142 }
2143
2144 pub fn http_client(&self) -> Option<jsonrpsee::http_client::HttpClient> {
2146 let url = self.http_url()?;
2147
2148 let client = if let Some(token) = self.bearer_token() {
2149 jsonrpsee::http_client::HttpClientBuilder::default()
2150 .set_headers(HeaderMap::from_iter([(AUTHORIZATION, token.parse().unwrap())]))
2151 .build(url)
2152 } else {
2153 jsonrpsee::http_client::HttpClientBuilder::default().build(url)
2154 };
2155
2156 client.expect("failed to create http client").into()
2157 }
2158
2159 pub async fn ws_client(&self) -> Option<jsonrpsee::ws_client::WsClient> {
2161 let url = self.ws_url()?;
2162 let mut builder = jsonrpsee::ws_client::WsClientBuilder::default();
2163
2164 if let Some(token) = self.bearer_token() {
2165 let headers = HeaderMap::from_iter([(AUTHORIZATION, token.parse().unwrap())]);
2166 builder = builder.set_headers(headers);
2167 }
2168
2169 let client = builder.build(url).await.expect("failed to create ws client");
2170 Some(client)
2171 }
2172
2173 pub fn eth_http_provider(
2175 &self,
2176 ) -> Option<impl Provider<alloy_network::Ethereum> + Clone + Unpin + 'static> {
2177 self.new_http_provider_for()
2178 }
2179
2180 pub fn eth_http_provider_with_wallet<W>(
2183 &self,
2184 wallet: W,
2185 ) -> Option<impl Provider<alloy_network::Ethereum> + Clone + Unpin + 'static>
2186 where
2187 W: IntoWallet<alloy_network::Ethereum, NetworkWallet: Clone + Unpin + 'static>,
2188 {
2189 let rpc_url = self.http_url()?;
2190 let provider =
2191 ProviderBuilder::new().wallet(wallet).connect_http(rpc_url.parse().expect("valid url"));
2192 Some(provider)
2193 }
2194
2195 pub fn new_http_provider_for<N>(&self) -> Option<impl Provider<N> + Clone + Unpin + 'static>
2200 where
2201 N: RecommendedFillers<RecommendedFillers: Unpin>,
2202 {
2203 let rpc_url = self.http_url()?;
2204 let provider = ProviderBuilder::default()
2205 .with_recommended_fillers()
2206 .connect_http(rpc_url.parse().expect("valid url"));
2207 Some(provider)
2208 }
2209
2210 pub async fn eth_ws_provider(
2212 &self,
2213 ) -> Option<impl Provider<alloy_network::Ethereum> + Clone + Unpin + 'static> {
2214 self.new_ws_provider_for().await
2215 }
2216
2217 pub async fn eth_ws_provider_with_wallet<W>(
2220 &self,
2221 wallet: W,
2222 ) -> Option<impl Provider<alloy_network::Ethereum> + Clone + Unpin + 'static>
2223 where
2224 W: IntoWallet<alloy_network::Ethereum, NetworkWallet: Clone + Unpin + 'static>,
2225 {
2226 let rpc_url = self.ws_url()?;
2227 let provider = ProviderBuilder::new()
2228 .wallet(wallet)
2229 .connect(&rpc_url)
2230 .await
2231 .expect("failed to create ws client");
2232 Some(provider)
2233 }
2234
2235 pub async fn new_ws_provider_for<N>(&self) -> Option<impl Provider<N> + Clone + Unpin + 'static>
2240 where
2241 N: RecommendedFillers<RecommendedFillers: Unpin>,
2242 {
2243 let rpc_url = self.ws_url()?;
2244 let provider = ProviderBuilder::default()
2245 .with_recommended_fillers()
2246 .connect(&rpc_url)
2247 .await
2248 .expect("failed to create ws client");
2249 Some(provider)
2250 }
2251
2252 pub async fn eth_ipc_provider(
2254 &self,
2255 ) -> Option<impl Provider<alloy_network::Ethereum> + Clone + Unpin + 'static> {
2256 self.new_ipc_provider_for().await
2257 }
2258
2259 pub async fn new_ipc_provider_for<N>(
2264 &self,
2265 ) -> Option<impl Provider<N> + Clone + Unpin + 'static>
2266 where
2267 N: RecommendedFillers<RecommendedFillers: Unpin>,
2268 {
2269 let rpc_url = self.ipc_endpoint()?;
2270 let provider = ProviderBuilder::default()
2271 .with_recommended_fillers()
2272 .connect(&rpc_url)
2273 .await
2274 .expect("failed to create ipc client");
2275 Some(provider)
2276 }
2277}
2278
2279#[cfg(test)]
2280mod tests {
2281 use super::*;
2282
2283 #[test]
2284 fn parse_eth_call_bundle_selection() {
2285 let selection = "eth,admin,debug".parse::<RpcModuleSelection>().unwrap();
2286 assert_eq!(
2287 selection,
2288 RpcModuleSelection::Selection(
2289 [RethRpcModule::Eth, RethRpcModule::Admin, RethRpcModule::Debug,].into()
2290 )
2291 );
2292 }
2293
2294 #[test]
2295 fn parse_rpc_module_selection() {
2296 let selection = "all".parse::<RpcModuleSelection>().unwrap();
2297 assert_eq!(selection, RpcModuleSelection::All);
2298 }
2299
2300 #[test]
2301 fn parse_rpc_module_selection_none() {
2302 let selection = "none".parse::<RpcModuleSelection>().unwrap();
2303 assert_eq!(selection, RpcModuleSelection::Selection(Default::default()));
2304 }
2305
2306 #[test]
2307 fn parse_rpc_unique_module_selection() {
2308 let selection = "eth,admin,eth,net".parse::<RpcModuleSelection>().unwrap();
2309 assert_eq!(
2310 selection,
2311 RpcModuleSelection::Selection(
2312 [RethRpcModule::Eth, RethRpcModule::Admin, RethRpcModule::Net,].into()
2313 )
2314 );
2315 }
2316
2317 #[test]
2318 fn identical_selection() {
2319 assert!(RpcModuleSelection::are_identical(
2320 Some(&RpcModuleSelection::All),
2321 Some(&RpcModuleSelection::All),
2322 ));
2323 assert!(!RpcModuleSelection::are_identical(
2324 Some(&RpcModuleSelection::All),
2325 Some(&RpcModuleSelection::Standard),
2326 ));
2327 assert!(RpcModuleSelection::are_identical(
2328 Some(&RpcModuleSelection::Selection(RpcModuleSelection::Standard.to_selection())),
2329 Some(&RpcModuleSelection::Standard),
2330 ));
2331 assert!(RpcModuleSelection::are_identical(
2332 Some(&RpcModuleSelection::Selection([RethRpcModule::Eth].into())),
2333 Some(&RpcModuleSelection::Selection([RethRpcModule::Eth].into())),
2334 ));
2335 assert!(RpcModuleSelection::are_identical(
2336 None,
2337 Some(&RpcModuleSelection::Selection(Default::default())),
2338 ));
2339 assert!(RpcModuleSelection::are_identical(
2340 Some(&RpcModuleSelection::Selection(Default::default())),
2341 None,
2342 ));
2343 assert!(RpcModuleSelection::are_identical(None, None));
2344 }
2345
2346 #[test]
2347 fn test_rpc_module_str() {
2348 macro_rules! assert_rpc_module {
2349 ($($s:expr => $v:expr,)*) => {
2350 $(
2351 let val: RethRpcModule = $s.parse().unwrap();
2352 assert_eq!(val, $v);
2353 assert_eq!(val.to_string(), $s);
2354 )*
2355 };
2356 }
2357 assert_rpc_module!
2358 (
2359 "admin" => RethRpcModule::Admin,
2360 "debug" => RethRpcModule::Debug,
2361 "eth" => RethRpcModule::Eth,
2362 "net" => RethRpcModule::Net,
2363 "trace" => RethRpcModule::Trace,
2364 "web3" => RethRpcModule::Web3,
2365 "rpc" => RethRpcModule::Rpc,
2366 "ots" => RethRpcModule::Ots,
2367 "reth" => RethRpcModule::Reth,
2368 );
2369 }
2370
2371 #[test]
2372 fn test_default_selection() {
2373 let selection = RpcModuleSelection::Standard.to_selection();
2374 assert_eq!(selection, [RethRpcModule::Eth, RethRpcModule::Net, RethRpcModule::Web3].into())
2375 }
2376
2377 #[test]
2378 fn test_create_rpc_module_config() {
2379 let selection = vec!["eth", "admin"];
2380 let config = RpcModuleSelection::try_from_selection(selection).unwrap();
2381 assert_eq!(
2382 config,
2383 RpcModuleSelection::Selection([RethRpcModule::Eth, RethRpcModule::Admin].into())
2384 );
2385 }
2386
2387 #[test]
2388 fn test_configure_transport_config() {
2389 let config = TransportRpcModuleConfig::default()
2390 .with_http([RethRpcModule::Eth, RethRpcModule::Admin]);
2391 assert_eq!(
2392 config,
2393 TransportRpcModuleConfig {
2394 http: Some(RpcModuleSelection::Selection(
2395 [RethRpcModule::Eth, RethRpcModule::Admin].into()
2396 )),
2397 ws: None,
2398 ipc: None,
2399 config: None,
2400 }
2401 )
2402 }
2403
2404 #[test]
2405 fn test_configure_transport_config_none() {
2406 let config = TransportRpcModuleConfig::default().with_http(Vec::<RethRpcModule>::new());
2407 assert_eq!(
2408 config,
2409 TransportRpcModuleConfig {
2410 http: Some(RpcModuleSelection::Selection(Default::default())),
2411 ws: None,
2412 ipc: None,
2413 config: None,
2414 }
2415 )
2416 }
2417
2418 fn create_test_module() -> RpcModule<()> {
2419 let mut module = RpcModule::new(());
2420 module.register_method("anything", |_, _, _| "succeed").unwrap();
2421 module
2422 }
2423
2424 #[test]
2425 fn test_remove_http_method() {
2426 let mut modules =
2427 TransportRpcModules { http: Some(create_test_module()), ..Default::default() };
2428 assert!(modules.remove_http_method("anything"));
2430
2431 assert!(!modules.remove_http_method("non_existent_method"));
2433
2434 assert!(modules.http.as_ref().unwrap().method("anything").is_none());
2436 }
2437
2438 #[test]
2439 fn test_remove_ws_method() {
2440 let mut modules =
2441 TransportRpcModules { ws: Some(create_test_module()), ..Default::default() };
2442
2443 assert!(modules.remove_ws_method("anything"));
2445
2446 assert!(!modules.remove_ws_method("non_existent_method"));
2448
2449 assert!(modules.ws.as_ref().unwrap().method("anything").is_none());
2451 }
2452
2453 #[test]
2454 fn test_remove_ipc_method() {
2455 let mut modules =
2456 TransportRpcModules { ipc: Some(create_test_module()), ..Default::default() };
2457
2458 assert!(modules.remove_ipc_method("anything"));
2460
2461 assert!(!modules.remove_ipc_method("non_existent_method"));
2463
2464 assert!(modules.ipc.as_ref().unwrap().method("anything").is_none());
2466 }
2467
2468 #[test]
2469 fn test_remove_method_from_configured() {
2470 let mut modules = TransportRpcModules {
2471 http: Some(create_test_module()),
2472 ws: Some(create_test_module()),
2473 ipc: Some(create_test_module()),
2474 ..Default::default()
2475 };
2476
2477 assert!(modules.remove_method_from_configured("anything"));
2479
2480 assert!(!modules.remove_method_from_configured("anything"));
2482
2483 assert!(!modules.remove_method_from_configured("non_existent_method"));
2485
2486 assert!(modules.http.as_ref().unwrap().method("anything").is_none());
2488 assert!(modules.ws.as_ref().unwrap().method("anything").is_none());
2489 assert!(modules.ipc.as_ref().unwrap().method("anything").is_none());
2490 }
2491
2492 #[test]
2493 fn test_transport_rpc_module_rename() {
2494 let mut modules = TransportRpcModules {
2495 http: Some(create_test_module()),
2496 ws: Some(create_test_module()),
2497 ipc: Some(create_test_module()),
2498 ..Default::default()
2499 };
2500
2501 assert!(modules.http.as_ref().unwrap().method("anything").is_some());
2503 assert!(modules.ws.as_ref().unwrap().method("anything").is_some());
2504 assert!(modules.ipc.as_ref().unwrap().method("anything").is_some());
2505
2506 assert!(modules.http.as_ref().unwrap().method("something").is_none());
2508 assert!(modules.ws.as_ref().unwrap().method("something").is_none());
2509 assert!(modules.ipc.as_ref().unwrap().method("something").is_none());
2510
2511 let mut other_module = RpcModule::new(());
2513 other_module.register_method("something", |_, _, _| "fails").unwrap();
2514
2515 modules.rename("anything", other_module).expect("rename failed");
2517
2518 assert!(modules.http.as_ref().unwrap().method("anything").is_none());
2520 assert!(modules.ws.as_ref().unwrap().method("anything").is_none());
2521 assert!(modules.ipc.as_ref().unwrap().method("anything").is_none());
2522
2523 assert!(modules.http.as_ref().unwrap().method("something").is_some());
2525 assert!(modules.ws.as_ref().unwrap().method("something").is_some());
2526 assert!(modules.ipc.as_ref().unwrap().method("something").is_some());
2527 }
2528
2529 #[test]
2530 fn test_replace_http_method() {
2531 let mut modules =
2532 TransportRpcModules { http: Some(create_test_module()), ..Default::default() };
2533
2534 let mut other_module = RpcModule::new(());
2535 other_module.register_method("something", |_, _, _| "fails").unwrap();
2536
2537 assert!(modules.replace_http(other_module.clone()).unwrap());
2538
2539 assert!(modules.http.as_ref().unwrap().method("something").is_some());
2540
2541 other_module.register_method("anything", |_, _, _| "fails").unwrap();
2542 assert!(modules.replace_http(other_module.clone()).unwrap());
2543
2544 assert!(modules.http.as_ref().unwrap().method("anything").is_some());
2545 }
2546 #[test]
2547 fn test_replace_ipc_method() {
2548 let mut modules =
2549 TransportRpcModules { ipc: Some(create_test_module()), ..Default::default() };
2550
2551 let mut other_module = RpcModule::new(());
2552 other_module.register_method("something", |_, _, _| "fails").unwrap();
2553
2554 assert!(modules.replace_ipc(other_module.clone()).unwrap());
2555
2556 assert!(modules.ipc.as_ref().unwrap().method("something").is_some());
2557
2558 other_module.register_method("anything", |_, _, _| "fails").unwrap();
2559 assert!(modules.replace_ipc(other_module.clone()).unwrap());
2560
2561 assert!(modules.ipc.as_ref().unwrap().method("anything").is_some());
2562 }
2563 #[test]
2564 fn test_replace_ws_method() {
2565 let mut modules =
2566 TransportRpcModules { ws: Some(create_test_module()), ..Default::default() };
2567
2568 let mut other_module = RpcModule::new(());
2569 other_module.register_method("something", |_, _, _| "fails").unwrap();
2570
2571 assert!(modules.replace_ws(other_module.clone()).unwrap());
2572
2573 assert!(modules.ws.as_ref().unwrap().method("something").is_some());
2574
2575 other_module.register_method("anything", |_, _, _| "fails").unwrap();
2576 assert!(modules.replace_ws(other_module.clone()).unwrap());
2577
2578 assert!(modules.ws.as_ref().unwrap().method("anything").is_some());
2579 }
2580
2581 #[test]
2582 fn test_replace_configured() {
2583 let mut modules = TransportRpcModules {
2584 http: Some(create_test_module()),
2585 ws: Some(create_test_module()),
2586 ipc: Some(create_test_module()),
2587 ..Default::default()
2588 };
2589 let mut other_module = RpcModule::new(());
2590 other_module.register_method("something", |_, _, _| "fails").unwrap();
2591
2592 assert!(modules.replace_configured(other_module).unwrap());
2593
2594 assert!(modules.http.as_ref().unwrap().method("something").is_some());
2596 assert!(modules.ipc.as_ref().unwrap().method("something").is_some());
2597 assert!(modules.ws.as_ref().unwrap().method("something").is_some());
2598
2599 assert!(modules.http.as_ref().unwrap().method("anything").is_some());
2600 assert!(modules.ipc.as_ref().unwrap().method("anything").is_some());
2601 assert!(modules.ws.as_ref().unwrap().method("anything").is_some());
2602 }
2603
2604 #[test]
2605 fn test_add_or_replace_if_module_configured() {
2606 let config = TransportRpcModuleConfig::default()
2608 .with_http([RethRpcModule::Eth])
2609 .with_ws([RethRpcModule::Eth]);
2610
2611 let mut http_module = RpcModule::new(());
2613 http_module.register_method("eth_existing", |_, _, _| "original").unwrap();
2614
2615 let mut ws_module = RpcModule::new(());
2617 ws_module.register_method("eth_existing", |_, _, _| "original").unwrap();
2618
2619 let ipc_module = RpcModule::new(());
2621
2622 let mut modules = TransportRpcModules {
2624 config,
2625 http: Some(http_module),
2626 ws: Some(ws_module),
2627 ipc: Some(ipc_module),
2628 };
2629
2630 let mut new_module = RpcModule::new(());
2632 new_module.register_method("eth_existing", |_, _, _| "replaced").unwrap(); new_module.register_method("eth_new", |_, _, _| "added").unwrap(); let new_methods: Methods = new_module.into();
2635
2636 let result = modules.add_or_replace_if_module_configured(RethRpcModule::Eth, new_methods);
2638 assert!(result.is_ok(), "Function should succeed");
2639
2640 let http = modules.http.as_ref().unwrap();
2642 assert!(http.method("eth_existing").is_some());
2643 assert!(http.method("eth_new").is_some());
2644
2645 let ws = modules.ws.as_ref().unwrap();
2647 assert!(ws.method("eth_existing").is_some());
2648 assert!(ws.method("eth_new").is_some());
2649
2650 let ipc = modules.ipc.as_ref().unwrap();
2652 assert!(ipc.method("eth_existing").is_none());
2653 assert!(ipc.method("eth_new").is_none());
2654 }
2655
2656 #[test]
2657 fn test_merge_if_module_configured_with_lazy_evaluation() {
2658 let config = TransportRpcModuleConfig::default().with_http([RethRpcModule::Eth]);
2660
2661 let mut modules =
2662 TransportRpcModules { config, http: Some(RpcModule::new(())), ws: None, ipc: None };
2663
2664 let mut closure_called = false;
2666
2667 let result = modules.merge_if_module_configured_with(RethRpcModule::Eth, || {
2669 closure_called = true;
2670 let mut methods = RpcModule::new(());
2671 methods.register_method("eth_test", |_, _, _| "test").unwrap();
2672 methods.into()
2673 });
2674
2675 assert!(result.is_ok());
2676 assert!(closure_called, "Closure should be called when module is configured");
2677 assert!(modules.http.as_ref().unwrap().method("eth_test").is_some());
2678
2679 closure_called = false;
2681 let result = modules.merge_if_module_configured_with(RethRpcModule::Debug, || {
2682 closure_called = true;
2683 RpcModule::new(()).into()
2684 });
2685
2686 assert!(result.is_ok());
2687 assert!(!closure_called, "Closure should NOT be called when module is not configured");
2688 }
2689}