1#![doc(
15 html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
16 html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
17 issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
18)]
19#![cfg_attr(not(test), warn(unused_crate_dependencies))]
20#![cfg_attr(docsrs, feature(doc_cfg))]
21
22use crate::{auth::AuthRpcModule, error::WsHttpSamePortError, metrics::RpcRequestMetrics};
23use alloy_network::{Ethereum, IntoWallet};
24use alloy_provider::{fillers::RecommendedFillers, Provider, ProviderBuilder};
25use core::marker::PhantomData;
26use error::{ConflictingModules, RpcError, ServerKind};
27use http::{header::AUTHORIZATION, HeaderMap};
28use jsonrpsee::{
29 core::RegisterMethodError,
30 server::{middleware::rpc::RpcServiceBuilder, AlreadyStoppedError, IdProvider, ServerHandle},
31 Methods, RpcModule,
32};
33use reth_chainspec::{ChainSpecProvider, EthereumHardforks};
34use reth_consensus::FullConsensus;
35use reth_engine_primitives::{ConsensusEngineEvent, ConsensusEngineHandle};
36use reth_evm::ConfigureEvm;
37use reth_network_api::{noop::NoopNetwork, NetworkInfo, Peers};
38use reth_payload_primitives::PayloadTypes;
39use reth_primitives_traits::{NodePrimitives, TxTy};
40use reth_rpc::{
41 AdminApi, DebugApi, EngineEthApi, EthApi, EthApiBuilder, EthBundle, MinerApi, NetApi,
42 OtterscanApi, RPCApi, RethApi, TraceApi, TxPoolApi, Web3Api,
43};
44use reth_rpc_api::servers::*;
45use reth_rpc_engine_api::RethEngineApi;
46use reth_rpc_eth_api::{
47 helpers::{
48 pending_block::PendingEnvBuilder, Call, EthApiSpec, EthTransactions, LoadPendingBlock,
49 TraceExt,
50 },
51 node::RpcNodeCoreAdapter,
52 EthApiServer, EthApiTypes, FullEthApiServer, FullEthApiTypes, RpcBlock, RpcConvert,
53 RpcConverter, RpcHeader, RpcNodeCore, RpcReceipt, RpcTransaction, RpcTxReq,
54};
55use reth_rpc_eth_types::{receipt::EthReceiptConverter, EthConfig, EthSubscriptionIdProvider};
56use reth_rpc_layer::{AuthLayer, Claims, CompressionLayer, JwtAuthValidator, JwtSecret};
57pub use reth_rpc_server_types::RethRpcModule;
58use reth_storage_api::{
59 AccountReader, BlockReader, ChangeSetReader, FullRpcProvider, NodePrimitivesProvider,
60 StateProviderFactory,
61};
62use reth_tasks::{pool::BlockingTaskGuard, Runtime};
63use reth_tokio_util::EventSender;
64use reth_transaction_pool::{noop::NoopTransactionPool, TransactionPool};
65use serde::{Deserialize, Serialize};
66use std::{
67 collections::HashMap,
68 fmt::Debug,
69 net::{Ipv4Addr, SocketAddr, SocketAddrV4},
70 time::{Duration, SystemTime, UNIX_EPOCH},
71};
72use tower_http::cors::CorsLayer;
73
74pub use cors::CorsDomainError;
75
76pub use jsonrpsee::server::ServerBuilder;
78use jsonrpsee::server::ServerConfigBuilder;
79pub use reth_ipc::server::{
80 Builder as IpcServerBuilder, RpcServiceBuilder as IpcRpcServiceBuilder,
81};
82pub use reth_rpc_server_types::{constants, RpcModuleSelection};
83pub use tower::layer::util::{Identity, Stack};
84
85pub mod auth;
87
88pub mod config;
90
91pub mod middleware;
93
94mod cors;
96
97pub mod error;
99
100pub mod eth;
102pub use eth::EthHandlers;
103
104mod metrics;
106use crate::middleware::RethRpcMiddleware;
107pub use metrics::{MeteredBatchRequestsFuture, MeteredRequestFuture, RpcRequestMetricsService};
108use reth_chain_state::{
109 CanonStateSubscriptions, ForkChoiceSubscriptions, PersistedBlockSubscriptions,
110};
111use reth_rpc::eth::sim_bundle::EthSimBundle;
112
113pub mod rate_limiter;
115
116#[derive(Debug, Clone)]
120pub struct RpcModuleBuilder<N, Provider, Pool, Network, EvmConfig, Consensus> {
121 provider: Provider,
123 pool: Pool,
125 network: Network,
127 executor: Option<Runtime>,
129 evm_config: EvmConfig,
131 consensus: Consensus,
133 _primitives: PhantomData<N>,
135}
136
137impl<N, Provider, Pool, Network, EvmConfig, Consensus>
140 RpcModuleBuilder<N, Provider, Pool, Network, EvmConfig, Consensus>
141{
142 pub const fn new(
144 provider: Provider,
145 pool: Pool,
146 network: Network,
147 executor: Runtime,
148 evm_config: EvmConfig,
149 consensus: Consensus,
150 ) -> Self {
151 Self {
152 provider,
153 pool,
154 network,
155 executor: Some(executor),
156 evm_config,
157 consensus,
158 _primitives: PhantomData,
159 }
160 }
161
162 pub fn with_provider<P>(
164 self,
165 provider: P,
166 ) -> RpcModuleBuilder<N, P, Pool, Network, EvmConfig, Consensus> {
167 let Self { pool, network, executor, evm_config, consensus, _primitives, .. } = self;
168 RpcModuleBuilder { provider, network, pool, executor, evm_config, consensus, _primitives }
169 }
170
171 pub fn with_pool<P>(
173 self,
174 pool: P,
175 ) -> RpcModuleBuilder<N, Provider, P, Network, EvmConfig, Consensus> {
176 let Self { provider, network, executor, evm_config, consensus, _primitives, .. } = self;
177 RpcModuleBuilder { provider, network, pool, executor, evm_config, consensus, _primitives }
178 }
179
180 pub fn with_noop_pool(
186 self,
187 ) -> RpcModuleBuilder<N, Provider, NoopTransactionPool, Network, EvmConfig, Consensus> {
188 let Self { provider, executor, network, evm_config, consensus, _primitives, .. } = self;
189 RpcModuleBuilder {
190 provider,
191 executor,
192 network,
193 evm_config,
194 pool: NoopTransactionPool::default(),
195 consensus,
196 _primitives,
197 }
198 }
199
200 pub fn with_network<Net>(
202 self,
203 network: Net,
204 ) -> RpcModuleBuilder<N, Provider, Pool, Net, EvmConfig, Consensus> {
205 let Self { provider, pool, executor, evm_config, consensus, _primitives, .. } = self;
206 RpcModuleBuilder { provider, network, pool, executor, evm_config, consensus, _primitives }
207 }
208
209 pub fn with_noop_network(
215 self,
216 ) -> RpcModuleBuilder<N, Provider, Pool, NoopNetwork, EvmConfig, Consensus> {
217 let Self { provider, pool, executor, evm_config, consensus, _primitives, .. } = self;
218 RpcModuleBuilder {
219 provider,
220 pool,
221 executor,
222 network: NoopNetwork::default(),
223 evm_config,
224 consensus,
225 _primitives,
226 }
227 }
228
229 pub fn with_executor(self, executor: Runtime) -> Self {
231 let Self { pool, network, provider, evm_config, consensus, _primitives, .. } = self;
232 Self {
233 provider,
234 network,
235 pool,
236 executor: Some(executor),
237 evm_config,
238 consensus,
239 _primitives,
240 }
241 }
242
243 pub fn with_evm_config<E>(
245 self,
246 evm_config: E,
247 ) -> RpcModuleBuilder<N, Provider, Pool, Network, E, Consensus> {
248 let Self { provider, pool, executor, network, consensus, _primitives, .. } = self;
249 RpcModuleBuilder { provider, network, pool, executor, evm_config, consensus, _primitives }
250 }
251
252 pub fn with_consensus<C>(
254 self,
255 consensus: C,
256 ) -> RpcModuleBuilder<N, Provider, Pool, Network, EvmConfig, C> {
257 let Self { provider, network, pool, executor, evm_config, _primitives, .. } = self;
258 RpcModuleBuilder { provider, network, pool, executor, evm_config, consensus, _primitives }
259 }
260
261 #[expect(clippy::type_complexity)]
263 pub fn eth_api_builder<ChainSpec>(
264 &self,
265 ) -> EthApiBuilder<
266 RpcNodeCoreAdapter<Provider, Pool, Network, EvmConfig>,
267 RpcConverter<Ethereum, EvmConfig, EthReceiptConverter<ChainSpec>>,
268 >
269 where
270 Provider: Clone,
271 Pool: Clone,
272 Network: Clone,
273 EvmConfig: Clone,
274 RpcNodeCoreAdapter<Provider, Pool, Network, EvmConfig>:
275 RpcNodeCore<Provider: ChainSpecProvider<ChainSpec = ChainSpec>, Evm = EvmConfig>,
276 {
277 EthApiBuilder::new(
278 self.provider.clone(),
279 self.pool.clone(),
280 self.network.clone(),
281 self.evm_config.clone(),
282 )
283 }
284
285 #[expect(clippy::type_complexity)]
291 pub fn bootstrap_eth_api<ChainSpec>(
292 &self,
293 ) -> EthApi<
294 RpcNodeCoreAdapter<Provider, Pool, Network, EvmConfig>,
295 RpcConverter<Ethereum, EvmConfig, EthReceiptConverter<ChainSpec>>,
296 >
297 where
298 Provider: Clone,
299 Pool: Clone,
300 Network: Clone,
301 EvmConfig: ConfigureEvm + Clone,
302 RpcNodeCoreAdapter<Provider, Pool, Network, EvmConfig>:
303 RpcNodeCore<Provider: ChainSpecProvider<ChainSpec = ChainSpec>, Evm = EvmConfig>,
304 RpcConverter<Ethereum, EvmConfig, EthReceiptConverter<ChainSpec>>: RpcConvert,
305 (): PendingEnvBuilder<EvmConfig>,
306 {
307 self.eth_api_builder().build()
308 }
309}
310
311impl<N, Provider, Pool, Network, EvmConfig, Consensus>
312 RpcModuleBuilder<N, Provider, Pool, Network, EvmConfig, Consensus>
313where
314 N: NodePrimitives,
315 Provider: FullRpcProvider<Block = N::Block, Receipt = N::Receipt, Header = N::BlockHeader>
316 + CanonStateSubscriptions<Primitives = N>
317 + ForkChoiceSubscriptions<Header = N::BlockHeader>
318 + PersistedBlockSubscriptions
319 + AccountReader
320 + ChangeSetReader,
321 Pool: TransactionPool + Clone + 'static,
322 Network: NetworkInfo + Peers + Clone + 'static,
323 EvmConfig: ConfigureEvm<Primitives = N> + 'static,
324 Consensus: FullConsensus<N> + Clone + 'static,
325{
326 pub fn build_with_auth_server<EthApi, Payload>(
333 self,
334 module_config: TransportRpcModuleConfig,
335 engine: impl IntoEngineApiRpcModule,
336 eth: EthApi,
337 engine_events: EventSender<ConsensusEngineEvent<N>>,
338 beacon_engine_handle: ConsensusEngineHandle<Payload>,
339 ) -> (
340 TransportRpcModules,
341 AuthRpcModule,
342 RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>,
343 )
344 where
345 EthApi: FullEthApiServer<Provider = Provider, Pool = Pool>,
346 Payload: PayloadTypes,
347 {
348 let config = module_config.config.clone().unwrap_or_default();
349
350 let mut registry = self.into_registry(config, eth, engine_events);
351 let modules = registry.create_transport_rpc_modules(module_config);
352 let auth_module = registry.create_auth_module(engine, beacon_engine_handle);
353
354 (modules, auth_module, registry)
355 }
356
357 pub fn into_registry<EthApi>(
362 self,
363 config: RpcModuleConfig,
364 eth: EthApi,
365 engine_events: EventSender<ConsensusEngineEvent<N>>,
366 ) -> RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
367 where
368 EthApi: FullEthApiServer<Provider = Provider, Pool = Pool>,
369 {
370 let Self { provider, pool, network, executor, consensus, evm_config, .. } = self;
371 let executor =
372 executor.expect("RpcModuleBuilder requires a Runtime to be set via `with_executor`");
373 RpcRegistryInner::new(
374 provider,
375 pool,
376 network,
377 executor,
378 consensus,
379 config,
380 evm_config,
381 eth,
382 engine_events,
383 )
384 }
385
386 pub fn build<EthApi>(
389 self,
390 module_config: TransportRpcModuleConfig,
391 eth: EthApi,
392 engine_events: EventSender<ConsensusEngineEvent<N>>,
393 ) -> TransportRpcModules<()>
394 where
395 EthApi: FullEthApiServer<Provider = Provider, Pool = Pool>,
396 {
397 if module_config.is_empty() {
398 TransportRpcModules::default()
399 } else {
400 let config = module_config.config.clone().unwrap_or_default();
401 let mut registry = self.into_registry(config, eth, engine_events);
402 registry.create_transport_rpc_modules(module_config)
403 }
404 }
405}
406
407impl<N: NodePrimitives> Default for RpcModuleBuilder<N, (), (), (), (), ()> {
408 fn default() -> Self {
409 Self {
410 provider: (),
411 pool: (),
412 network: (),
413 executor: None,
414 evm_config: (),
415 consensus: (),
416 _primitives: PhantomData,
417 }
418 }
419}
420
421#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize)]
423pub struct RpcModuleConfig {
424 eth: EthConfig,
426}
427
428impl RpcModuleConfig {
431 pub fn builder() -> RpcModuleConfigBuilder {
433 RpcModuleConfigBuilder::default()
434 }
435
436 pub const fn new(eth: EthConfig) -> Self {
438 Self { eth }
439 }
440
441 pub const fn eth(&self) -> &EthConfig {
443 &self.eth
444 }
445
446 pub const fn eth_mut(&mut self) -> &mut EthConfig {
448 &mut self.eth
449 }
450}
451
452#[derive(Clone, Debug, Default)]
454pub struct RpcModuleConfigBuilder {
455 eth: Option<EthConfig>,
456}
457
458impl RpcModuleConfigBuilder {
461 pub fn eth(mut self, eth: EthConfig) -> Self {
463 self.eth = Some(eth);
464 self
465 }
466
467 pub fn build(self) -> RpcModuleConfig {
469 let Self { eth } = self;
470 RpcModuleConfig { eth: eth.unwrap_or_default() }
471 }
472
473 pub const fn get_eth(&self) -> Option<&EthConfig> {
475 self.eth.as_ref()
476 }
477
478 pub const fn eth_mut(&mut self) -> &mut Option<EthConfig> {
480 &mut self.eth
481 }
482
483 pub fn eth_mut_or_default(&mut self) -> &mut EthConfig {
485 self.eth.get_or_insert_with(EthConfig::default)
486 }
487}
488
489#[derive(Debug)]
491pub struct RpcRegistryInner<Provider, Pool, Network, EthApi: EthApiTypes, EvmConfig, Consensus> {
492 provider: Provider,
493 pool: Pool,
494 network: Network,
495 executor: Runtime,
496 evm_config: EvmConfig,
497 consensus: Consensus,
498 eth: EthHandlers<EthApi>,
500 blocking_pool_guard: BlockingTaskGuard,
502 modules: HashMap<RethRpcModule, Methods>,
504 eth_config: EthConfig,
506 engine_events:
508 EventSender<ConsensusEngineEvent<<EthApi::RpcConvert as RpcConvert>::Primitives>>,
509}
510
511impl<N, Provider, Pool, Network, EthApi, EvmConfig, Consensus>
514 RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
515where
516 N: NodePrimitives,
517 Provider: StateProviderFactory
518 + CanonStateSubscriptions<Primitives = N>
519 + BlockReader<Block = N::Block, Receipt = N::Receipt>
520 + Clone
521 + Unpin
522 + 'static,
523 Pool: Send + Sync + Clone + 'static,
524 Network: Clone + 'static,
525 EthApi: FullEthApiTypes + 'static,
526 EvmConfig: ConfigureEvm<Primitives = N>,
527{
528 #[expect(clippy::too_many_arguments)]
530 pub fn new(
531 provider: Provider,
532 pool: Pool,
533 network: Network,
534 executor: Runtime,
535 consensus: Consensus,
536 config: RpcModuleConfig,
537 evm_config: EvmConfig,
538 eth_api: EthApi,
539 engine_events: EventSender<
540 ConsensusEngineEvent<<EthApi::Provider as NodePrimitivesProvider>::Primitives>,
541 >,
542 ) -> Self
543 where
544 EvmConfig: ConfigureEvm<Primitives = N>,
545 {
546 let blocking_pool_guard = BlockingTaskGuard::new(config.eth.max_tracing_requests);
547
548 let eth = EthHandlers::bootstrap(config.eth.clone(), executor.clone(), eth_api);
549
550 Self {
551 provider,
552 pool,
553 network,
554 eth,
555 executor,
556 consensus,
557 modules: Default::default(),
558 blocking_pool_guard,
559 eth_config: config.eth,
560 evm_config,
561 engine_events,
562 }
563 }
564}
565
566impl<Provider, Pool, Network, EthApi, Evm, Consensus>
567 RpcRegistryInner<Provider, Pool, Network, EthApi, Evm, Consensus>
568where
569 EthApi: EthApiTypes,
570{
571 pub const fn eth_api(&self) -> &EthApi {
573 &self.eth.api
574 }
575
576 pub const fn eth_handlers(&self) -> &EthHandlers<EthApi> {
578 &self.eth
579 }
580
581 pub const fn pool(&self) -> &Pool {
583 &self.pool
584 }
585
586 pub const fn tasks(&self) -> &Runtime {
588 &self.executor
589 }
590
591 pub const fn provider(&self) -> &Provider {
593 &self.provider
594 }
595
596 pub const fn evm_config(&self) -> &Evm {
598 &self.evm_config
599 }
600
601 pub fn methods(&self) -> Vec<Methods> {
603 self.modules.values().cloned().collect()
604 }
605
606 pub fn module(&self) -> RpcModule<()> {
608 let mut module = RpcModule::new(());
609 for methods in self.modules.values().cloned() {
610 module.merge(methods).expect("No conflicts");
611 }
612 module
613 }
614}
615
616impl<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
617 RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
618where
619 Network: NetworkInfo + Clone + 'static,
620 EthApi: EthApiTypes,
621 Provider: BlockReader + ChainSpecProvider<ChainSpec: EthereumHardforks>,
622 EvmConfig: ConfigureEvm,
623{
624 pub fn admin_api(&self) -> AdminApi<Network, Provider::ChainSpec, Pool>
626 where
627 Network: Peers,
628 Pool: TransactionPool + Clone + 'static,
629 {
630 AdminApi::new(self.network.clone(), self.provider.chain_spec(), self.pool.clone())
631 }
632
633 pub fn web3_api(&self) -> Web3Api<Network> {
635 Web3Api::new(self.network.clone())
636 }
637
638 pub fn register_admin(&mut self) -> &mut Self
640 where
641 Network: Peers,
642 Pool: TransactionPool + Clone + 'static,
643 {
644 let adminapi = self.admin_api();
645 self.modules.insert(RethRpcModule::Admin, adminapi.into_rpc().into());
646 self
647 }
648
649 pub fn register_web3(&mut self) -> &mut Self {
651 let web3api = self.web3_api();
652 self.modules.insert(RethRpcModule::Web3, web3api.into_rpc().into());
653 self
654 }
655}
656
657impl<N, Provider, Pool, Network, EthApi, EvmConfig, Consensus>
658 RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
659where
660 N: NodePrimitives,
661 Provider: FullRpcProvider<
662 Header = N::BlockHeader,
663 Block = N::Block,
664 Receipt = N::Receipt,
665 Transaction = N::SignedTx,
666 > + AccountReader
667 + ChangeSetReader
668 + CanonStateSubscriptions<Primitives = N>
669 + ForkChoiceSubscriptions<Header = N::BlockHeader>
670 + PersistedBlockSubscriptions,
671 Network: NetworkInfo + Peers + Clone + 'static,
672 EthApi: EthApiServer<
673 RpcTxReq<EthApi::NetworkTypes>,
674 RpcTransaction<EthApi::NetworkTypes>,
675 RpcBlock<EthApi::NetworkTypes>,
676 RpcReceipt<EthApi::NetworkTypes>,
677 RpcHeader<EthApi::NetworkTypes>,
678 TxTy<N>,
679 > + EthApiTypes,
680 EvmConfig: ConfigureEvm<Primitives = N> + 'static,
681{
682 pub fn register_eth(&mut self) -> &mut Self {
688 let eth_api = self.eth_api().clone();
689 self.modules.insert(RethRpcModule::Eth, eth_api.into_rpc().into());
690 self
691 }
692
693 pub fn register_ots(&mut self) -> &mut Self
699 where
700 EthApi: TraceExt + EthTransactions<Primitives = N>,
701 {
702 let otterscan_api = self.otterscan_api();
703 self.modules.insert(RethRpcModule::Ots, otterscan_api.into_rpc().into());
704 self
705 }
706
707 pub fn register_debug(&mut self) -> &mut Self
713 where
714 EthApi: EthTransactions + TraceExt,
715 {
716 let debug_api = self.debug_api();
717 self.modules.insert(RethRpcModule::Debug, debug_api.into_rpc().into());
718 self
719 }
720
721 pub fn register_trace(&mut self) -> &mut Self
727 where
728 EthApi: TraceExt,
729 {
730 let trace_api = self.trace_api();
731 self.modules.insert(RethRpcModule::Trace, trace_api.into_rpc().into());
732 self
733 }
734
735 pub fn register_net(&mut self) -> &mut Self
743 where
744 EthApi: EthApiSpec + 'static,
745 {
746 let netapi = self.net_api();
747 self.modules.insert(RethRpcModule::Net, netapi.into_rpc().into());
748 self
749 }
750
751 pub fn register_reth(&mut self) -> &mut Self {
759 let rethapi = self.reth_api();
760 self.modules.insert(RethRpcModule::Reth, rethapi.into_rpc().into());
761 self
762 }
763
764 pub fn otterscan_api(&self) -> OtterscanApi<EthApi> {
770 let eth_api = self.eth_api().clone();
771 OtterscanApi::new(eth_api)
772 }
773}
774
775impl<N, Provider, Pool, Network, EthApi, EvmConfig, Consensus>
776 RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
777where
778 N: NodePrimitives,
779 Provider: FullRpcProvider<
780 Block = N::Block,
781 Header = N::BlockHeader,
782 Transaction = N::SignedTx,
783 Receipt = N::Receipt,
784 > + AccountReader
785 + ChangeSetReader,
786 Network: NetworkInfo + Peers + Clone + 'static,
787 EthApi: EthApiTypes,
788 EvmConfig: ConfigureEvm<Primitives = N>,
789{
790 pub fn trace_api(&self) -> TraceApi<EthApi> {
796 TraceApi::new(
797 self.eth_api().clone(),
798 self.blocking_pool_guard.clone(),
799 self.eth_config.clone(),
800 )
801 }
802
803 pub fn bundle_api(&self) -> EthBundle<EthApi>
809 where
810 EthApi: EthTransactions + LoadPendingBlock + Call,
811 {
812 let eth_api = self.eth_api().clone();
813 EthBundle::new(eth_api, self.blocking_pool_guard.clone())
814 }
815
816 pub fn debug_api(&self) -> DebugApi<EthApi>
822 where
823 EthApi: FullEthApiTypes,
824 {
825 DebugApi::new(
826 self.eth_api().clone(),
827 self.blocking_pool_guard.clone(),
828 self.tasks(),
829 self.engine_events.new_listener(),
830 )
831 }
832
833 pub fn net_api(&self) -> NetApi<Network, EthApi>
839 where
840 EthApi: EthApiSpec + 'static,
841 {
842 let eth_api = self.eth_api().clone();
843 NetApi::new(self.network.clone(), eth_api)
844 }
845
846 pub fn reth_api(&self) -> RethApi<Provider, EvmConfig> {
848 RethApi::new(
849 self.provider.clone(),
850 self.evm_config.clone(),
851 self.blocking_pool_guard.clone(),
852 self.executor.clone(),
853 )
854 }
855}
856
857impl<N, Provider, Pool, Network, EthApi, EvmConfig, Consensus>
858 RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
859where
860 N: NodePrimitives,
861 Provider: FullRpcProvider<Block = N::Block>
862 + CanonStateSubscriptions<Primitives = N>
863 + ForkChoiceSubscriptions<Header = N::BlockHeader>
864 + PersistedBlockSubscriptions
865 + AccountReader
866 + ChangeSetReader,
867 Pool: TransactionPool + Clone + 'static,
868 Network: NetworkInfo + Peers + Clone + 'static,
869 EthApi: FullEthApiServer,
870 EvmConfig: ConfigureEvm<Primitives = N> + 'static,
871 Consensus: FullConsensus<N> + Clone + 'static,
872{
873 pub fn create_auth_module<Payload>(
880 &self,
881 engine_api: impl IntoEngineApiRpcModule,
882 beacon_engine_handle: ConsensusEngineHandle<Payload>,
883 ) -> AuthRpcModule
884 where
885 Payload: PayloadTypes,
886 {
887 let mut module = engine_api.into_rpc_module();
888
889 let reth_engine_api = RethEngineApi::new(beacon_engine_handle);
891 module
892 .merge(RethEngineApiServer::into_rpc(reth_engine_api).remove_context())
893 .expect("No conflicting methods");
894
895 let eth_handlers = self.eth_handlers();
897 let engine_eth = EngineEthApi::new(eth_handlers.api.clone(), eth_handlers.filter.clone());
898
899 module.merge(engine_eth.into_rpc()).expect("No conflicting methods");
900
901 AuthRpcModule { inner: module }
902 }
903
904 fn maybe_module(&mut self, config: Option<&RpcModuleSelection>) -> Option<RpcModule<()>> {
906 config.map(|config| self.module_for(config))
907 }
908
909 pub fn create_transport_rpc_modules(
913 &mut self,
914 config: TransportRpcModuleConfig,
915 ) -> TransportRpcModules<()> {
916 let mut modules = TransportRpcModules::default();
917 let http = self.maybe_module(config.http.as_ref());
918 let ws = self.maybe_module(config.ws.as_ref());
919 let ipc = self.maybe_module(config.ipc.as_ref());
920
921 modules.config = config;
922 modules.http = http;
923 modules.ws = ws;
924 modules.ipc = ipc;
925 modules
926 }
927
928 pub fn module_for(&mut self, config: &RpcModuleSelection) -> RpcModule<()> {
931 let mut module = RpcModule::new(());
932 let all_methods = self.reth_methods(config.iter_selection());
933 for methods in all_methods {
934 module.merge(methods).expect("No conflicts");
935 }
936 module
937 }
938
939 pub fn reth_methods(
948 &mut self,
949 namespaces: impl Iterator<Item = RethRpcModule>,
950 ) -> Vec<Methods> {
951 let EthHandlers { api: eth_api, filter: eth_filter, pubsub: eth_pubsub, .. } =
952 self.eth_handlers().clone();
953
954 let namespaces: Vec<_> = namespaces.collect();
956 namespaces
957 .iter()
958 .map(|namespace| {
959 self.modules
960 .entry(namespace.clone())
961 .or_insert_with(|| match namespace.clone() {
962 RethRpcModule::Admin => AdminApi::new(
963 self.network.clone(),
964 self.provider.chain_spec(),
965 self.pool.clone(),
966 )
967 .into_rpc()
968 .into(),
969 RethRpcModule::Debug => DebugApi::new(
970 eth_api.clone(),
971 self.blocking_pool_guard.clone(),
972 &self.executor,
973 self.engine_events.new_listener(),
974 )
975 .into_rpc()
976 .into(),
977 RethRpcModule::Eth => {
978 let mut module = eth_api.clone().into_rpc();
980 module.merge(eth_filter.clone().into_rpc()).expect("No conflicts");
981 module.merge(eth_pubsub.clone().into_rpc()).expect("No conflicts");
982 module
983 .merge(
984 EthBundle::new(
985 eth_api.clone(),
986 self.blocking_pool_guard.clone(),
987 )
988 .into_rpc(),
989 )
990 .expect("No conflicts");
991
992 module.into()
993 }
994 RethRpcModule::Net => {
995 NetApi::new(self.network.clone(), eth_api.clone()).into_rpc().into()
996 }
997 RethRpcModule::Trace => TraceApi::new(
998 eth_api.clone(),
999 self.blocking_pool_guard.clone(),
1000 self.eth_config.clone(),
1001 )
1002 .into_rpc()
1003 .into(),
1004 RethRpcModule::Web3 => Web3Api::new(self.network.clone()).into_rpc().into(),
1005 RethRpcModule::Txpool => TxPoolApi::new(
1006 self.eth.api.pool().clone(),
1007 dyn_clone::clone(self.eth.api.converter()),
1008 )
1009 .into_rpc()
1010 .into(),
1011 RethRpcModule::Rpc => RPCApi::new(
1012 namespaces
1013 .iter()
1014 .map(|module| (module.to_string(), "1.0".to_string()))
1015 .collect(),
1016 )
1017 .into_rpc()
1018 .into(),
1019 RethRpcModule::Ots => OtterscanApi::new(eth_api.clone()).into_rpc().into(),
1020 RethRpcModule::Reth => RethApi::new(
1021 self.provider.clone(),
1022 self.evm_config.clone(),
1023 self.blocking_pool_guard.clone(),
1024 self.executor.clone(),
1025 )
1026 .into_rpc()
1027 .into(),
1028 RethRpcModule::Miner => MinerApi::default().into_rpc().into(),
1029 RethRpcModule::Mev => {
1030 EthSimBundle::new(eth_api.clone(), self.blocking_pool_guard.clone())
1031 .into_rpc()
1032 .into()
1033 }
1034 RethRpcModule::Flashbots |
1038 RethRpcModule::Testing |
1039 RethRpcModule::Other(_) => Default::default(),
1040 })
1041 .clone()
1042 })
1043 .collect::<Vec<_>>()
1044 }
1045}
1046
1047impl<Provider, Pool, Network, EthApi, EvmConfig, Consensus> Clone
1048 for RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
1049where
1050 EthApi: EthApiTypes,
1051 Provider: Clone,
1052 Pool: Clone,
1053 Network: Clone,
1054 EvmConfig: Clone,
1055 Consensus: Clone,
1056{
1057 fn clone(&self) -> Self {
1058 Self {
1059 provider: self.provider.clone(),
1060 pool: self.pool.clone(),
1061 network: self.network.clone(),
1062 executor: self.executor.clone(),
1063 evm_config: self.evm_config.clone(),
1064 consensus: self.consensus.clone(),
1065 eth: self.eth.clone(),
1066 blocking_pool_guard: self.blocking_pool_guard.clone(),
1067 modules: self.modules.clone(),
1068 eth_config: self.eth_config.clone(),
1069 engine_events: self.engine_events.clone(),
1070 }
1071 }
1072}
1073
1074#[derive(Debug)]
1086pub struct RpcServerConfig<RpcMiddleware = Identity> {
1087 http_server_config: Option<ServerConfigBuilder>,
1089 http_cors_domains: Option<String>,
1091 http_addr: Option<SocketAddr>,
1093 http_disable_compression: bool,
1095 ws_server_config: Option<ServerConfigBuilder>,
1097 ws_cors_domains: Option<String>,
1099 ws_addr: Option<SocketAddr>,
1101 ipc_server_config: Option<IpcServerBuilder<Identity, Identity>>,
1103 ipc_endpoint: Option<String>,
1105 jwt_secret: Option<JwtSecret>,
1107 rpc_metrics_enabled: bool,
1109 rpc_middleware: RpcMiddleware,
1111}
1112
1113impl Default for RpcServerConfig<Identity> {
1116 fn default() -> Self {
1118 Self {
1119 http_server_config: None,
1120 http_cors_domains: None,
1121 http_addr: None,
1122 http_disable_compression: false,
1123 ws_server_config: None,
1124 ws_cors_domains: None,
1125 ws_addr: None,
1126 ipc_server_config: None,
1127 ipc_endpoint: None,
1128 jwt_secret: None,
1129 rpc_metrics_enabled: true,
1130 rpc_middleware: Default::default(),
1131 }
1132 }
1133}
1134
1135impl RpcServerConfig {
1136 pub fn http(config: ServerConfigBuilder) -> Self {
1138 Self::default().with_http(config)
1139 }
1140
1141 pub fn ws(config: ServerConfigBuilder) -> Self {
1143 Self::default().with_ws(config)
1144 }
1145
1146 pub fn ipc(config: IpcServerBuilder<Identity, Identity>) -> Self {
1148 Self::default().with_ipc(config)
1149 }
1150
1151 pub fn with_http(mut self, config: ServerConfigBuilder) -> Self {
1156 self.http_server_config =
1157 Some(config.set_id_provider(EthSubscriptionIdProvider::default()));
1158 self
1159 }
1160
1161 pub fn with_ws(mut self, config: ServerConfigBuilder) -> Self {
1166 self.ws_server_config = Some(config.set_id_provider(EthSubscriptionIdProvider::default()));
1167 self
1168 }
1169
1170 pub fn with_ipc(mut self, config: IpcServerBuilder<Identity, Identity>) -> Self {
1175 self.ipc_server_config = Some(config.set_id_provider(EthSubscriptionIdProvider::default()));
1176 self
1177 }
1178}
1179
1180impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
1181 pub fn set_rpc_middleware<T>(self, rpc_middleware: T) -> RpcServerConfig<T> {
1183 RpcServerConfig {
1184 http_server_config: self.http_server_config,
1185 http_cors_domains: self.http_cors_domains,
1186 http_addr: self.http_addr,
1187 http_disable_compression: self.http_disable_compression,
1188 ws_server_config: self.ws_server_config,
1189 ws_cors_domains: self.ws_cors_domains,
1190 ws_addr: self.ws_addr,
1191 ipc_server_config: self.ipc_server_config,
1192 ipc_endpoint: self.ipc_endpoint,
1193 jwt_secret: self.jwt_secret,
1194 rpc_metrics_enabled: self.rpc_metrics_enabled,
1195 rpc_middleware,
1196 }
1197 }
1198
1199 pub const fn with_rpc_metrics_enabled(mut self, enabled: bool) -> Self {
1201 self.rpc_metrics_enabled = enabled;
1202 self
1203 }
1204
1205 pub fn with_cors(self, cors_domain: Option<String>) -> Self {
1207 self.with_http_cors(cors_domain.clone()).with_ws_cors(cors_domain)
1208 }
1209
1210 pub fn with_ws_cors(mut self, cors_domain: Option<String>) -> Self {
1212 self.ws_cors_domains = cors_domain;
1213 self
1214 }
1215
1216 pub const fn with_http_disable_compression(mut self, http_disable_compression: bool) -> Self {
1218 self.http_disable_compression = http_disable_compression;
1219 self
1220 }
1221
1222 pub fn with_http_cors(mut self, cors_domain: Option<String>) -> Self {
1224 self.http_cors_domains = cors_domain;
1225 self
1226 }
1227
1228 pub const fn with_http_address(mut self, addr: SocketAddr) -> Self {
1233 self.http_addr = Some(addr);
1234 self
1235 }
1236
1237 pub const fn with_ws_address(mut self, addr: SocketAddr) -> Self {
1242 self.ws_addr = Some(addr);
1243 self
1244 }
1245
1246 pub fn with_id_provider<I>(mut self, id_provider: I) -> Self
1250 where
1251 I: IdProvider + Clone + 'static,
1252 {
1253 if let Some(config) = self.http_server_config {
1254 self.http_server_config = Some(config.set_id_provider(id_provider.clone()));
1255 }
1256 if let Some(config) = self.ws_server_config {
1257 self.ws_server_config = Some(config.set_id_provider(id_provider.clone()));
1258 }
1259 if let Some(ipc) = self.ipc_server_config {
1260 self.ipc_server_config = Some(ipc.set_id_provider(id_provider));
1261 }
1262
1263 self
1264 }
1265
1266 pub fn with_ipc_endpoint(mut self, path: impl Into<String>) -> Self {
1270 self.ipc_endpoint = Some(path.into());
1271 self
1272 }
1273
1274 pub const fn with_jwt_secret(mut self, secret: Option<JwtSecret>) -> Self {
1276 self.jwt_secret = secret;
1277 self
1278 }
1279
1280 pub fn with_tokio_runtime(mut self, tokio_runtime: Option<tokio::runtime::Handle>) -> Self {
1282 let Some(tokio_runtime) = tokio_runtime else { return self };
1283 if let Some(http_server_config) = self.http_server_config {
1284 self.http_server_config =
1285 Some(http_server_config.custom_tokio_runtime(tokio_runtime.clone()));
1286 }
1287 if let Some(ws_server_config) = self.ws_server_config {
1288 self.ws_server_config =
1289 Some(ws_server_config.custom_tokio_runtime(tokio_runtime.clone()));
1290 }
1291 if let Some(ipc_server_config) = self.ipc_server_config {
1292 self.ipc_server_config = Some(ipc_server_config.custom_tokio_runtime(tokio_runtime));
1293 }
1294 self
1295 }
1296
1297 pub const fn has_server(&self) -> bool {
1301 self.http_server_config.is_some() ||
1302 self.ws_server_config.is_some() ||
1303 self.ipc_server_config.is_some()
1304 }
1305
1306 pub const fn http_address(&self) -> Option<SocketAddr> {
1308 self.http_addr
1309 }
1310
1311 pub const fn ws_address(&self) -> Option<SocketAddr> {
1313 self.ws_addr
1314 }
1315
1316 pub fn ipc_endpoint(&self) -> Option<String> {
1318 self.ipc_endpoint.clone()
1319 }
1320
1321 pub const fn rpc_metrics_enabled(&self) -> bool {
1323 self.rpc_metrics_enabled
1324 }
1325
1326 fn maybe_cors_layer(cors: Option<String>) -> Result<Option<CorsLayer>, CorsDomainError> {
1328 cors.as_deref().map(cors::create_cors_layer).transpose()
1329 }
1330
1331 fn maybe_jwt_layer(jwt_secret: Option<JwtSecret>) -> Option<AuthLayer<JwtAuthValidator>> {
1333 jwt_secret.map(|secret| AuthLayer::new(JwtAuthValidator::new(secret)))
1334 }
1335
1336 fn maybe_compression_layer(disable_compression: bool) -> Option<CompressionLayer> {
1339 if disable_compression {
1340 None
1341 } else {
1342 Some(CompressionLayer::new())
1343 }
1344 }
1345
1346 pub async fn start(self, modules: &TransportRpcModules) -> Result<RpcServerHandle, RpcError>
1352 where
1353 RpcMiddleware: RethRpcMiddleware,
1354 {
1355 let mut http_handle = None;
1356 let mut ws_handle = None;
1357 let mut ipc_handle = None;
1358
1359 let http_socket_addr = self.http_addr.unwrap_or(SocketAddr::V4(SocketAddrV4::new(
1360 Ipv4Addr::LOCALHOST,
1361 constants::DEFAULT_HTTP_RPC_PORT,
1362 )));
1363
1364 let ws_socket_addr = self.ws_addr.unwrap_or(SocketAddr::V4(SocketAddrV4::new(
1365 Ipv4Addr::LOCALHOST,
1366 constants::DEFAULT_WS_RPC_PORT,
1367 )));
1368
1369 let rpc_metrics_enabled = self.rpc_metrics_enabled;
1370 let ipc_path =
1371 self.ipc_endpoint.clone().unwrap_or_else(|| constants::DEFAULT_IPC_ENDPOINT.into());
1372
1373 if let Some(builder) = self.ipc_server_config {
1374 let ipc = builder
1375 .set_rpc_middleware(
1376 IpcRpcServiceBuilder::new().option_layer(
1377 rpc_metrics_enabled
1378 .then(|| modules.ipc.as_ref().map(RpcRequestMetrics::ipc))
1379 .flatten(),
1380 ),
1381 )
1382 .build(ipc_path);
1383 ipc_handle = Some(ipc.start(modules.ipc.clone().expect("ipc server error")).await?);
1384 }
1385
1386 if self.http_addr == self.ws_addr &&
1388 self.http_server_config.is_some() &&
1389 self.ws_server_config.is_some()
1390 {
1391 let cors = match (self.ws_cors_domains.as_ref(), self.http_cors_domains.as_ref()) {
1392 (Some(ws_cors), Some(http_cors)) => {
1393 if ws_cors.trim() != http_cors.trim() {
1394 return Err(WsHttpSamePortError::ConflictingCorsDomains {
1395 http_cors_domains: Some(http_cors.clone()),
1396 ws_cors_domains: Some(ws_cors.clone()),
1397 }
1398 .into());
1399 }
1400 Some(ws_cors)
1401 }
1402 (a, b) => a.or(b),
1403 }
1404 .cloned();
1405
1406 modules.config.ensure_ws_http_identical()?;
1408
1409 if let Some(config) = self.http_server_config {
1410 let server = ServerBuilder::new()
1411 .set_http_middleware(
1412 tower::ServiceBuilder::new()
1413 .option_layer(Self::maybe_cors_layer(cors)?)
1414 .option_layer(Self::maybe_jwt_layer(self.jwt_secret))
1415 .option_layer(Self::maybe_compression_layer(
1416 self.http_disable_compression,
1417 )),
1418 )
1419 .set_rpc_middleware(
1420 RpcServiceBuilder::default()
1421 .option_layer(
1422 rpc_metrics_enabled
1423 .then(|| {
1424 modules
1425 .http
1426 .as_ref()
1427 .or(modules.ws.as_ref())
1428 .map(RpcRequestMetrics::same_port)
1429 })
1430 .flatten(),
1431 )
1432 .layer(self.rpc_middleware.clone()),
1433 )
1434 .set_config(config.build())
1435 .build(http_socket_addr)
1436 .await
1437 .map_err(|err| {
1438 RpcError::server_error(err, ServerKind::WsHttp(http_socket_addr))
1439 })?;
1440 let addr = server.local_addr().map_err(|err| {
1441 RpcError::server_error(err, ServerKind::WsHttp(http_socket_addr))
1442 })?;
1443 if let Some(module) = modules.http.as_ref().or(modules.ws.as_ref()) {
1444 let handle = server.start(module.clone());
1445 http_handle = Some(handle.clone());
1446 ws_handle = Some(handle);
1447 }
1448 return Ok(RpcServerHandle {
1449 http_local_addr: Some(addr),
1450 ws_local_addr: Some(addr),
1451 http: http_handle,
1452 ws: ws_handle,
1453 ipc_endpoint: self.ipc_endpoint.clone(),
1454 ipc: ipc_handle,
1455 jwt_secret: self.jwt_secret,
1456 });
1457 }
1458 }
1459
1460 let mut ws_local_addr = None;
1461 let mut ws_server = None;
1462 let mut http_local_addr = None;
1463 let mut http_server = None;
1464
1465 if let Some(config) = self.ws_server_config {
1466 let server = ServerBuilder::new()
1467 .set_config(config.ws_only().build())
1468 .set_http_middleware(
1469 tower::ServiceBuilder::new()
1470 .option_layer(Self::maybe_cors_layer(self.ws_cors_domains.clone())?)
1471 .option_layer(Self::maybe_jwt_layer(self.jwt_secret)),
1472 )
1473 .set_rpc_middleware(
1474 RpcServiceBuilder::default()
1475 .option_layer(
1476 rpc_metrics_enabled
1477 .then(|| modules.ws.as_ref().map(RpcRequestMetrics::ws))
1478 .flatten(),
1479 )
1480 .layer(self.rpc_middleware.clone()),
1481 )
1482 .build(ws_socket_addr)
1483 .await
1484 .map_err(|err| RpcError::server_error(err, ServerKind::WS(ws_socket_addr)))?;
1485
1486 let addr = server
1487 .local_addr()
1488 .map_err(|err| RpcError::server_error(err, ServerKind::WS(ws_socket_addr)))?;
1489
1490 ws_local_addr = Some(addr);
1491 ws_server = Some(server);
1492 }
1493
1494 if let Some(config) = self.http_server_config {
1495 let server = ServerBuilder::new()
1496 .set_config(config.http_only().build())
1497 .set_http_middleware(
1498 tower::ServiceBuilder::new()
1499 .option_layer(Self::maybe_cors_layer(self.http_cors_domains.clone())?)
1500 .option_layer(Self::maybe_jwt_layer(self.jwt_secret))
1501 .option_layer(Self::maybe_compression_layer(self.http_disable_compression)),
1502 )
1503 .set_rpc_middleware(
1504 RpcServiceBuilder::default()
1505 .option_layer(
1506 rpc_metrics_enabled
1507 .then(|| modules.http.as_ref().map(RpcRequestMetrics::http))
1508 .flatten(),
1509 )
1510 .layer(self.rpc_middleware.clone()),
1511 )
1512 .build(http_socket_addr)
1513 .await
1514 .map_err(|err| RpcError::server_error(err, ServerKind::Http(http_socket_addr)))?;
1515 let local_addr = server
1516 .local_addr()
1517 .map_err(|err| RpcError::server_error(err, ServerKind::Http(http_socket_addr)))?;
1518 http_local_addr = Some(local_addr);
1519 http_server = Some(server);
1520 }
1521
1522 http_handle = http_server
1523 .map(|http_server| http_server.start(modules.http.clone().expect("http server error")));
1524 ws_handle = ws_server
1525 .map(|ws_server| ws_server.start(modules.ws.clone().expect("ws server error")));
1526 Ok(RpcServerHandle {
1527 http_local_addr,
1528 ws_local_addr,
1529 http: http_handle,
1530 ws: ws_handle,
1531 ipc_endpoint: self.ipc_endpoint.clone(),
1532 ipc: ipc_handle,
1533 jwt_secret: self.jwt_secret,
1534 })
1535 }
1536}
1537
1538#[derive(Debug, Clone, Default, Eq, PartialEq)]
1550pub struct TransportRpcModuleConfig {
1551 http: Option<RpcModuleSelection>,
1553 ws: Option<RpcModuleSelection>,
1555 ipc: Option<RpcModuleSelection>,
1557 config: Option<RpcModuleConfig>,
1559}
1560
1561impl TransportRpcModuleConfig {
1564 pub fn set_http(http: impl Into<RpcModuleSelection>) -> Self {
1566 Self::default().with_http(http)
1567 }
1568
1569 pub fn set_ws(ws: impl Into<RpcModuleSelection>) -> Self {
1571 Self::default().with_ws(ws)
1572 }
1573
1574 pub fn set_ipc(ipc: impl Into<RpcModuleSelection>) -> Self {
1576 Self::default().with_ipc(ipc)
1577 }
1578
1579 pub fn with_http(mut self, http: impl Into<RpcModuleSelection>) -> Self {
1581 self.http = Some(http.into());
1582 self
1583 }
1584
1585 pub fn with_ws(mut self, ws: impl Into<RpcModuleSelection>) -> Self {
1587 self.ws = Some(ws.into());
1588 self
1589 }
1590
1591 pub fn with_ipc(mut self, ipc: impl Into<RpcModuleSelection>) -> Self {
1593 self.ipc = Some(ipc.into());
1594 self
1595 }
1596
1597 pub fn with_config(mut self, config: RpcModuleConfig) -> Self {
1599 self.config = Some(config);
1600 self
1601 }
1602
1603 pub const fn http_mut(&mut self) -> &mut Option<RpcModuleSelection> {
1605 &mut self.http
1606 }
1607
1608 pub const fn ws_mut(&mut self) -> &mut Option<RpcModuleSelection> {
1610 &mut self.ws
1611 }
1612
1613 pub const fn ipc_mut(&mut self) -> &mut Option<RpcModuleSelection> {
1615 &mut self.ipc
1616 }
1617
1618 pub const fn config_mut(&mut self) -> &mut Option<RpcModuleConfig> {
1620 &mut self.config
1621 }
1622
1623 pub const fn is_empty(&self) -> bool {
1625 self.http.is_none() && self.ws.is_none() && self.ipc.is_none()
1626 }
1627
1628 pub const fn http(&self) -> Option<&RpcModuleSelection> {
1630 self.http.as_ref()
1631 }
1632
1633 pub const fn ws(&self) -> Option<&RpcModuleSelection> {
1635 self.ws.as_ref()
1636 }
1637
1638 pub const fn ipc(&self) -> Option<&RpcModuleSelection> {
1640 self.ipc.as_ref()
1641 }
1642
1643 pub const fn config(&self) -> Option<&RpcModuleConfig> {
1645 self.config.as_ref()
1646 }
1647
1648 pub fn contains_any(&self, module: &RethRpcModule) -> bool {
1650 self.contains_http(module) || self.contains_ws(module) || self.contains_ipc(module)
1651 }
1652
1653 pub fn contains_http(&self, module: &RethRpcModule) -> bool {
1655 self.http.as_ref().is_some_and(|http| http.contains(module))
1656 }
1657
1658 pub fn contains_ws(&self, module: &RethRpcModule) -> bool {
1660 self.ws.as_ref().is_some_and(|ws| ws.contains(module))
1661 }
1662
1663 pub fn contains_ipc(&self, module: &RethRpcModule) -> bool {
1665 self.ipc.as_ref().is_some_and(|ipc| ipc.contains(module))
1666 }
1667
1668 fn ensure_ws_http_identical(&self) -> Result<(), WsHttpSamePortError> {
1671 if RpcModuleSelection::are_identical(self.http.as_ref(), self.ws.as_ref()) {
1672 Ok(())
1673 } else {
1674 let http_modules =
1675 self.http.as_ref().map(RpcModuleSelection::to_selection).unwrap_or_default();
1676 let ws_modules =
1677 self.ws.as_ref().map(RpcModuleSelection::to_selection).unwrap_or_default();
1678
1679 let http_not_ws = http_modules.difference(&ws_modules).cloned().collect();
1680 let ws_not_http = ws_modules.difference(&http_modules).cloned().collect();
1681 let overlap = http_modules.intersection(&ws_modules).cloned().collect();
1682
1683 Err(WsHttpSamePortError::ConflictingModules(Box::new(ConflictingModules {
1684 overlap,
1685 http_not_ws,
1686 ws_not_http,
1687 })))
1688 }
1689 }
1690}
1691
1692#[derive(Debug, Clone, Default)]
1694pub struct TransportRpcModules<Context = ()> {
1695 config: TransportRpcModuleConfig,
1697 http: Option<RpcModule<Context>>,
1699 ws: Option<RpcModule<Context>>,
1701 ipc: Option<RpcModule<Context>>,
1703}
1704
1705impl TransportRpcModules {
1708 pub fn with_config(mut self, config: TransportRpcModuleConfig) -> Self {
1711 self.config = config;
1712 self
1713 }
1714
1715 pub fn with_http(mut self, http: RpcModule<()>) -> Self {
1718 self.http = Some(http);
1719 self
1720 }
1721
1722 pub fn with_ws(mut self, ws: RpcModule<()>) -> Self {
1725 self.ws = Some(ws);
1726 self
1727 }
1728
1729 pub fn with_ipc(mut self, ipc: RpcModule<()>) -> Self {
1732 self.ipc = Some(ipc);
1733 self
1734 }
1735
1736 pub const fn module_config(&self) -> &TransportRpcModuleConfig {
1738 &self.config
1739 }
1740
1741 pub fn merge_if_module_configured(
1746 &mut self,
1747 module: RethRpcModule,
1748 other: impl Into<Methods>,
1749 ) -> Result<(), RegisterMethodError> {
1750 let other = other.into();
1751 if self.module_config().contains_http(&module) {
1752 self.merge_http(other.clone())?;
1753 }
1754 if self.module_config().contains_ws(&module) {
1755 self.merge_ws(other.clone())?;
1756 }
1757 if self.module_config().contains_ipc(&module) {
1758 self.merge_ipc(other)?;
1759 }
1760
1761 Ok(())
1762 }
1763
1764 pub fn merge_if_module_configured_with<F>(
1771 &mut self,
1772 module: RethRpcModule,
1773 f: F,
1774 ) -> Result<(), RegisterMethodError>
1775 where
1776 F: FnOnce() -> Methods,
1777 {
1778 if !self.module_config().contains_any(&module) {
1780 return Ok(());
1781 }
1782 self.merge_if_module_configured(module, f())
1783 }
1784
1785 pub fn merge_http(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
1791 if let Some(ref mut http) = self.http {
1792 return http.merge(other.into()).map(|_| true)
1793 }
1794 Ok(false)
1795 }
1796
1797 pub fn merge_ws(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
1803 if let Some(ref mut ws) = self.ws {
1804 return ws.merge(other.into()).map(|_| true)
1805 }
1806 Ok(false)
1807 }
1808
1809 pub fn merge_ipc(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
1815 if let Some(ref mut ipc) = self.ipc {
1816 return ipc.merge(other.into()).map(|_| true)
1817 }
1818 Ok(false)
1819 }
1820
1821 pub fn merge_configured(
1825 &mut self,
1826 other: impl Into<Methods>,
1827 ) -> Result<(), RegisterMethodError> {
1828 let other = other.into();
1829 self.merge_http(other.clone())?;
1830 self.merge_ws(other.clone())?;
1831 self.merge_ipc(other)?;
1832 Ok(())
1833 }
1834
1835 pub fn methods_by_module(&self, module: RethRpcModule) -> Methods {
1839 self.methods_by(|name| name.starts_with(module.as_str()))
1840 }
1841
1842 pub fn methods_by<F>(&self, mut filter: F) -> Methods
1846 where
1847 F: FnMut(&str) -> bool,
1848 {
1849 let mut methods = Methods::new();
1850
1851 let mut f =
1853 |name: &str, mm: &Methods| filter(name) && !mm.method_names().any(|m| m == name);
1854
1855 if let Some(m) = self.http_methods(|name| f(name, &methods)) {
1856 let _ = methods.merge(m);
1857 }
1858 if let Some(m) = self.ws_methods(|name| f(name, &methods)) {
1859 let _ = methods.merge(m);
1860 }
1861 if let Some(m) = self.ipc_methods(|name| f(name, &methods)) {
1862 let _ = methods.merge(m);
1863 }
1864 methods
1865 }
1866
1867 pub fn http_methods<F>(&self, filter: F) -> Option<Methods>
1871 where
1872 F: FnMut(&str) -> bool,
1873 {
1874 self.http.as_ref().map(|module| methods_by(module, filter))
1875 }
1876
1877 pub fn ws_methods<F>(&self, filter: F) -> Option<Methods>
1881 where
1882 F: FnMut(&str) -> bool,
1883 {
1884 self.ws.as_ref().map(|module| methods_by(module, filter))
1885 }
1886
1887 pub fn ipc_methods<F>(&self, filter: F) -> Option<Methods>
1891 where
1892 F: FnMut(&str) -> bool,
1893 {
1894 self.ipc.as_ref().map(|module| methods_by(module, filter))
1895 }
1896
1897 pub fn remove_http_method(&mut self, method_name: &'static str) -> bool {
1905 if let Some(http_module) = &mut self.http {
1906 http_module.remove_method(method_name).is_some()
1907 } else {
1908 false
1909 }
1910 }
1911
1912 pub fn remove_http_methods(&mut self, methods: impl IntoIterator<Item = &'static str>) {
1914 for name in methods {
1915 self.remove_http_method(name);
1916 }
1917 }
1918
1919 pub fn remove_ws_method(&mut self, method_name: &'static str) -> bool {
1927 if let Some(ws_module) = &mut self.ws {
1928 ws_module.remove_method(method_name).is_some()
1929 } else {
1930 false
1931 }
1932 }
1933
1934 pub fn remove_ws_methods(&mut self, methods: impl IntoIterator<Item = &'static str>) {
1936 for name in methods {
1937 self.remove_ws_method(name);
1938 }
1939 }
1940
1941 pub fn remove_ipc_method(&mut self, method_name: &'static str) -> bool {
1949 if let Some(ipc_module) = &mut self.ipc {
1950 ipc_module.remove_method(method_name).is_some()
1951 } else {
1952 false
1953 }
1954 }
1955
1956 pub fn remove_ipc_methods(&mut self, methods: impl IntoIterator<Item = &'static str>) {
1958 for name in methods {
1959 self.remove_ipc_method(name);
1960 }
1961 }
1962
1963 pub fn remove_method_from_configured(&mut self, method_name: &'static str) -> bool {
1967 let http_removed = self.remove_http_method(method_name);
1968 let ws_removed = self.remove_ws_method(method_name);
1969 let ipc_removed = self.remove_ipc_method(method_name);
1970
1971 http_removed || ws_removed || ipc_removed
1972 }
1973
1974 pub fn rename(
1978 &mut self,
1979 old_name: &'static str,
1980 new_method: impl Into<Methods>,
1981 ) -> Result<(), RegisterMethodError> {
1982 self.remove_method_from_configured(old_name);
1984
1985 self.merge_configured(new_method)
1987 }
1988
1989 pub fn replace_http(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
1996 let other = other.into();
1997 self.remove_http_methods(other.method_names());
1998 self.merge_http(other)
1999 }
2000
2001 pub fn replace_ipc(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
2008 let other = other.into();
2009 self.remove_ipc_methods(other.method_names());
2010 self.merge_ipc(other)
2011 }
2012
2013 pub fn replace_ws(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
2020 let other = other.into();
2021 self.remove_ws_methods(other.method_names());
2022 self.merge_ws(other)
2023 }
2024
2025 pub fn replace_configured(
2029 &mut self,
2030 other: impl Into<Methods>,
2031 ) -> Result<bool, RegisterMethodError> {
2032 let other = other.into();
2033 self.replace_http(other.clone())?;
2034 self.replace_ws(other.clone())?;
2035 self.replace_ipc(other)?;
2036 Ok(true)
2037 }
2038
2039 pub fn add_or_replace_http(
2043 &mut self,
2044 other: impl Into<Methods>,
2045 ) -> Result<bool, RegisterMethodError> {
2046 let other = other.into();
2047 self.remove_http_methods(other.method_names());
2048 self.merge_http(other)
2049 }
2050
2051 pub fn add_or_replace_ws(
2055 &mut self,
2056 other: impl Into<Methods>,
2057 ) -> Result<bool, RegisterMethodError> {
2058 let other = other.into();
2059 self.remove_ws_methods(other.method_names());
2060 self.merge_ws(other)
2061 }
2062
2063 pub fn add_or_replace_ipc(
2067 &mut self,
2068 other: impl Into<Methods>,
2069 ) -> Result<bool, RegisterMethodError> {
2070 let other = other.into();
2071 self.remove_ipc_methods(other.method_names());
2072 self.merge_ipc(other)
2073 }
2074
2075 pub fn add_or_replace_configured(
2077 &mut self,
2078 other: impl Into<Methods>,
2079 ) -> Result<(), RegisterMethodError> {
2080 let other = other.into();
2081 self.add_or_replace_http(other.clone())?;
2082 self.add_or_replace_ws(other.clone())?;
2083 self.add_or_replace_ipc(other)?;
2084 Ok(())
2085 }
2086 pub fn add_or_replace_if_module_configured(
2089 &mut self,
2090 module: RethRpcModule,
2091 other: impl Into<Methods>,
2092 ) -> Result<(), RegisterMethodError> {
2093 let other = other.into();
2094 if self.module_config().contains_http(&module) {
2095 self.add_or_replace_http(other.clone())?;
2096 }
2097 if self.module_config().contains_ws(&module) {
2098 self.add_or_replace_ws(other.clone())?;
2099 }
2100 if self.module_config().contains_ipc(&module) {
2101 self.add_or_replace_ipc(other)?;
2102 }
2103 Ok(())
2104 }
2105}
2106
2107fn methods_by<T, F>(module: &RpcModule<T>, mut filter: F) -> Methods
2109where
2110 F: FnMut(&str) -> bool,
2111{
2112 let mut methods = Methods::new();
2113 let method_names = module.method_names().filter(|name| filter(name));
2114
2115 for name in method_names {
2116 if let Some(matched_method) = module.method(name).cloned() {
2117 let _ = methods.verify_and_insert(name, matched_method);
2118 }
2119 }
2120
2121 methods
2122}
2123
2124#[derive(Clone, Debug)]
2129#[must_use = "Server stops if dropped"]
2130pub struct RpcServerHandle {
2131 http_local_addr: Option<SocketAddr>,
2133 ws_local_addr: Option<SocketAddr>,
2134 http: Option<ServerHandle>,
2135 ws: Option<ServerHandle>,
2136 ipc_endpoint: Option<String>,
2137 ipc: Option<jsonrpsee::server::ServerHandle>,
2138 jwt_secret: Option<JwtSecret>,
2139}
2140
2141impl RpcServerHandle {
2144 fn bearer_token(&self) -> Option<String> {
2146 self.jwt_secret.as_ref().map(|secret| {
2147 format!(
2148 "Bearer {}",
2149 secret
2150 .encode(&Claims {
2151 iat: (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() +
2152 Duration::from_secs(60))
2153 .as_secs(),
2154 exp: None,
2155 })
2156 .unwrap()
2157 )
2158 })
2159 }
2160 pub const fn http_local_addr(&self) -> Option<SocketAddr> {
2162 self.http_local_addr
2163 }
2164
2165 pub const fn ws_local_addr(&self) -> Option<SocketAddr> {
2167 self.ws_local_addr
2168 }
2169
2170 pub fn stop(self) -> Result<(), AlreadyStoppedError> {
2172 if let Some(handle) = self.http {
2173 handle.stop()?
2174 }
2175
2176 if let Some(handle) = self.ws {
2177 handle.stop()?
2178 }
2179
2180 if let Some(handle) = self.ipc {
2181 handle.stop()?
2182 }
2183
2184 Ok(())
2185 }
2186
2187 pub fn ipc_endpoint(&self) -> Option<String> {
2189 self.ipc_endpoint.clone()
2190 }
2191
2192 pub fn http_url(&self) -> Option<String> {
2194 self.http_local_addr.map(|addr| format!("http://{addr}"))
2195 }
2196
2197 pub fn ws_url(&self) -> Option<String> {
2199 self.ws_local_addr.map(|addr| format!("ws://{addr}"))
2200 }
2201
2202 pub fn http_client(&self) -> Option<jsonrpsee::http_client::HttpClient> {
2204 let url = self.http_url()?;
2205
2206 let client = if let Some(token) = self.bearer_token() {
2207 jsonrpsee::http_client::HttpClientBuilder::default()
2208 .set_headers(HeaderMap::from_iter([(AUTHORIZATION, token.parse().unwrap())]))
2209 .build(url)
2210 } else {
2211 jsonrpsee::http_client::HttpClientBuilder::default().build(url)
2212 };
2213
2214 client.expect("failed to create http client").into()
2215 }
2216
2217 pub async fn ws_client(&self) -> Option<jsonrpsee::ws_client::WsClient> {
2219 let url = self.ws_url()?;
2220 let mut builder = jsonrpsee::ws_client::WsClientBuilder::default();
2221
2222 if let Some(token) = self.bearer_token() {
2223 let headers = HeaderMap::from_iter([(AUTHORIZATION, token.parse().unwrap())]);
2224 builder = builder.set_headers(headers);
2225 }
2226
2227 let client = builder.build(url).await.expect("failed to create ws client");
2228 Some(client)
2229 }
2230
2231 pub fn eth_http_provider(
2233 &self,
2234 ) -> Option<impl Provider<alloy_network::Ethereum> + Clone + Unpin + 'static> {
2235 self.new_http_provider_for()
2236 }
2237
2238 pub fn eth_http_provider_with_wallet<W>(
2241 &self,
2242 wallet: W,
2243 ) -> Option<impl Provider<alloy_network::Ethereum> + Clone + Unpin + 'static>
2244 where
2245 W: IntoWallet<alloy_network::Ethereum, NetworkWallet: Clone + Unpin + 'static>,
2246 {
2247 let rpc_url = self.http_url()?;
2248 let provider =
2249 ProviderBuilder::new().wallet(wallet).connect_http(rpc_url.parse().expect("valid url"));
2250 Some(provider)
2251 }
2252
2253 pub fn new_http_provider_for<N>(&self) -> Option<impl Provider<N> + Clone + Unpin + 'static>
2258 where
2259 N: RecommendedFillers<RecommendedFillers: Unpin>,
2260 {
2261 let rpc_url = self.http_url()?;
2262 let provider = ProviderBuilder::default()
2263 .with_recommended_fillers()
2264 .connect_http(rpc_url.parse().expect("valid url"));
2265 Some(provider)
2266 }
2267
2268 pub async fn eth_ws_provider(
2270 &self,
2271 ) -> Option<impl Provider<alloy_network::Ethereum> + Clone + Unpin + 'static> {
2272 self.new_ws_provider_for().await
2273 }
2274
2275 pub async fn eth_ws_provider_with_wallet<W>(
2278 &self,
2279 wallet: W,
2280 ) -> Option<impl Provider<alloy_network::Ethereum> + Clone + Unpin + 'static>
2281 where
2282 W: IntoWallet<alloy_network::Ethereum, NetworkWallet: Clone + Unpin + 'static>,
2283 {
2284 let rpc_url = self.ws_url()?;
2285 let provider = ProviderBuilder::new()
2286 .wallet(wallet)
2287 .connect(&rpc_url)
2288 .await
2289 .expect("failed to create ws client");
2290 Some(provider)
2291 }
2292
2293 pub async fn new_ws_provider_for<N>(&self) -> Option<impl Provider<N> + Clone + Unpin + 'static>
2298 where
2299 N: RecommendedFillers<RecommendedFillers: Unpin>,
2300 {
2301 let rpc_url = self.ws_url()?;
2302 let provider = ProviderBuilder::default()
2303 .with_recommended_fillers()
2304 .connect(&rpc_url)
2305 .await
2306 .expect("failed to create ws client");
2307 Some(provider)
2308 }
2309
2310 pub async fn eth_ipc_provider(
2312 &self,
2313 ) -> Option<impl Provider<alloy_network::Ethereum> + Clone + Unpin + 'static> {
2314 self.new_ipc_provider_for().await
2315 }
2316
2317 pub async fn new_ipc_provider_for<N>(
2322 &self,
2323 ) -> Option<impl Provider<N> + Clone + Unpin + 'static>
2324 where
2325 N: RecommendedFillers<RecommendedFillers: Unpin>,
2326 {
2327 let rpc_url = self.ipc_endpoint()?;
2328 let provider = ProviderBuilder::default()
2329 .with_recommended_fillers()
2330 .connect(&rpc_url)
2331 .await
2332 .expect("failed to create ipc client");
2333 Some(provider)
2334 }
2335}
2336
2337#[cfg(test)]
2338mod tests {
2339 use super::*;
2340
2341 #[test]
2342 fn parse_eth_call_bundle_selection() {
2343 let selection = "eth,admin,debug".parse::<RpcModuleSelection>().unwrap();
2344 assert_eq!(
2345 selection,
2346 RpcModuleSelection::Selection(
2347 [RethRpcModule::Eth, RethRpcModule::Admin, RethRpcModule::Debug,].into()
2348 )
2349 );
2350 }
2351
2352 #[test]
2353 fn parse_rpc_module_selection() {
2354 let selection = "all".parse::<RpcModuleSelection>().unwrap();
2355 assert_eq!(selection, RpcModuleSelection::All);
2356 }
2357
2358 #[test]
2359 fn parse_rpc_module_selection_none() {
2360 let selection = "none".parse::<RpcModuleSelection>().unwrap();
2361 assert_eq!(selection, RpcModuleSelection::Selection(Default::default()));
2362 }
2363
2364 #[test]
2365 fn parse_rpc_unique_module_selection() {
2366 let selection = "eth,admin,eth,net".parse::<RpcModuleSelection>().unwrap();
2367 assert_eq!(
2368 selection,
2369 RpcModuleSelection::Selection(
2370 [RethRpcModule::Eth, RethRpcModule::Admin, RethRpcModule::Net,].into()
2371 )
2372 );
2373 }
2374
2375 #[test]
2376 fn identical_selection() {
2377 assert!(RpcModuleSelection::are_identical(
2378 Some(&RpcModuleSelection::All),
2379 Some(&RpcModuleSelection::All),
2380 ));
2381 assert!(!RpcModuleSelection::are_identical(
2382 Some(&RpcModuleSelection::All),
2383 Some(&RpcModuleSelection::Standard),
2384 ));
2385 assert!(RpcModuleSelection::are_identical(
2386 Some(&RpcModuleSelection::Selection(RpcModuleSelection::Standard.to_selection())),
2387 Some(&RpcModuleSelection::Standard),
2388 ));
2389 assert!(RpcModuleSelection::are_identical(
2390 Some(&RpcModuleSelection::Selection([RethRpcModule::Eth].into())),
2391 Some(&RpcModuleSelection::Selection([RethRpcModule::Eth].into())),
2392 ));
2393 assert!(RpcModuleSelection::are_identical(
2394 None,
2395 Some(&RpcModuleSelection::Selection(Default::default())),
2396 ));
2397 assert!(RpcModuleSelection::are_identical(
2398 Some(&RpcModuleSelection::Selection(Default::default())),
2399 None,
2400 ));
2401 assert!(RpcModuleSelection::are_identical(None, None));
2402 }
2403
2404 #[test]
2405 fn test_rpc_module_str() {
2406 macro_rules! assert_rpc_module {
2407 ($($s:expr => $v:expr,)*) => {
2408 $(
2409 let val: RethRpcModule = $s.parse().unwrap();
2410 assert_eq!(val, $v);
2411 assert_eq!(val.to_string(), $s);
2412 )*
2413 };
2414 }
2415 assert_rpc_module!
2416 (
2417 "admin" => RethRpcModule::Admin,
2418 "debug" => RethRpcModule::Debug,
2419 "eth" => RethRpcModule::Eth,
2420 "net" => RethRpcModule::Net,
2421 "trace" => RethRpcModule::Trace,
2422 "web3" => RethRpcModule::Web3,
2423 "rpc" => RethRpcModule::Rpc,
2424 "ots" => RethRpcModule::Ots,
2425 "reth" => RethRpcModule::Reth,
2426 );
2427 }
2428
2429 #[test]
2430 fn test_default_selection() {
2431 let selection = RpcModuleSelection::Standard.to_selection();
2432 assert_eq!(selection, [RethRpcModule::Eth, RethRpcModule::Net, RethRpcModule::Web3].into())
2433 }
2434
2435 #[test]
2436 fn test_create_rpc_module_config() {
2437 let selection = vec!["eth", "admin"];
2438 let config = RpcModuleSelection::try_from_selection(selection).unwrap();
2439 assert_eq!(
2440 config,
2441 RpcModuleSelection::Selection([RethRpcModule::Eth, RethRpcModule::Admin].into())
2442 );
2443 }
2444
2445 #[test]
2446 fn test_configure_transport_config() {
2447 let config = TransportRpcModuleConfig::default()
2448 .with_http([RethRpcModule::Eth, RethRpcModule::Admin]);
2449 assert_eq!(
2450 config,
2451 TransportRpcModuleConfig {
2452 http: Some(RpcModuleSelection::Selection(
2453 [RethRpcModule::Eth, RethRpcModule::Admin].into()
2454 )),
2455 ws: None,
2456 ipc: None,
2457 config: None,
2458 }
2459 )
2460 }
2461
2462 #[test]
2463 fn test_configure_transport_config_none() {
2464 let config = TransportRpcModuleConfig::default().with_http(Vec::<RethRpcModule>::new());
2465 assert_eq!(
2466 config,
2467 TransportRpcModuleConfig {
2468 http: Some(RpcModuleSelection::Selection(Default::default())),
2469 ws: None,
2470 ipc: None,
2471 config: None,
2472 }
2473 )
2474 }
2475
2476 fn create_test_module() -> RpcModule<()> {
2477 let mut module = RpcModule::new(());
2478 module.register_method("anything", |_, _, _| "succeed").unwrap();
2479 module
2480 }
2481
2482 #[test]
2483 fn test_remove_http_method() {
2484 let mut modules =
2485 TransportRpcModules { http: Some(create_test_module()), ..Default::default() };
2486 assert!(modules.remove_http_method("anything"));
2488
2489 assert!(!modules.remove_http_method("non_existent_method"));
2491
2492 assert!(modules.http.as_ref().unwrap().method("anything").is_none());
2494 }
2495
2496 #[test]
2497 fn test_remove_ws_method() {
2498 let mut modules =
2499 TransportRpcModules { ws: Some(create_test_module()), ..Default::default() };
2500
2501 assert!(modules.remove_ws_method("anything"));
2503
2504 assert!(!modules.remove_ws_method("non_existent_method"));
2506
2507 assert!(modules.ws.as_ref().unwrap().method("anything").is_none());
2509 }
2510
2511 #[test]
2512 fn test_remove_ipc_method() {
2513 let mut modules =
2514 TransportRpcModules { ipc: Some(create_test_module()), ..Default::default() };
2515
2516 assert!(modules.remove_ipc_method("anything"));
2518
2519 assert!(!modules.remove_ipc_method("non_existent_method"));
2521
2522 assert!(modules.ipc.as_ref().unwrap().method("anything").is_none());
2524 }
2525
2526 #[test]
2527 fn test_remove_method_from_configured() {
2528 let mut modules = TransportRpcModules {
2529 http: Some(create_test_module()),
2530 ws: Some(create_test_module()),
2531 ipc: Some(create_test_module()),
2532 ..Default::default()
2533 };
2534
2535 assert!(modules.remove_method_from_configured("anything"));
2537
2538 assert!(!modules.remove_method_from_configured("anything"));
2540
2541 assert!(!modules.remove_method_from_configured("non_existent_method"));
2543
2544 assert!(modules.http.as_ref().unwrap().method("anything").is_none());
2546 assert!(modules.ws.as_ref().unwrap().method("anything").is_none());
2547 assert!(modules.ipc.as_ref().unwrap().method("anything").is_none());
2548 }
2549
2550 #[test]
2551 fn test_transport_rpc_module_rename() {
2552 let mut modules = TransportRpcModules {
2553 http: Some(create_test_module()),
2554 ws: Some(create_test_module()),
2555 ipc: Some(create_test_module()),
2556 ..Default::default()
2557 };
2558
2559 assert!(modules.http.as_ref().unwrap().method("anything").is_some());
2561 assert!(modules.ws.as_ref().unwrap().method("anything").is_some());
2562 assert!(modules.ipc.as_ref().unwrap().method("anything").is_some());
2563
2564 assert!(modules.http.as_ref().unwrap().method("something").is_none());
2566 assert!(modules.ws.as_ref().unwrap().method("something").is_none());
2567 assert!(modules.ipc.as_ref().unwrap().method("something").is_none());
2568
2569 let mut other_module = RpcModule::new(());
2571 other_module.register_method("something", |_, _, _| "fails").unwrap();
2572
2573 modules.rename("anything", other_module).expect("rename failed");
2575
2576 assert!(modules.http.as_ref().unwrap().method("anything").is_none());
2578 assert!(modules.ws.as_ref().unwrap().method("anything").is_none());
2579 assert!(modules.ipc.as_ref().unwrap().method("anything").is_none());
2580
2581 assert!(modules.http.as_ref().unwrap().method("something").is_some());
2583 assert!(modules.ws.as_ref().unwrap().method("something").is_some());
2584 assert!(modules.ipc.as_ref().unwrap().method("something").is_some());
2585 }
2586
2587 #[test]
2588 fn test_replace_http_method() {
2589 let mut modules =
2590 TransportRpcModules { http: Some(create_test_module()), ..Default::default() };
2591
2592 let mut other_module = RpcModule::new(());
2593 other_module.register_method("something", |_, _, _| "fails").unwrap();
2594
2595 assert!(modules.replace_http(other_module.clone()).unwrap());
2596
2597 assert!(modules.http.as_ref().unwrap().method("something").is_some());
2598
2599 other_module.register_method("anything", |_, _, _| "fails").unwrap();
2600 assert!(modules.replace_http(other_module.clone()).unwrap());
2601
2602 assert!(modules.http.as_ref().unwrap().method("anything").is_some());
2603 }
2604 #[test]
2605 fn test_replace_ipc_method() {
2606 let mut modules =
2607 TransportRpcModules { ipc: Some(create_test_module()), ..Default::default() };
2608
2609 let mut other_module = RpcModule::new(());
2610 other_module.register_method("something", |_, _, _| "fails").unwrap();
2611
2612 assert!(modules.replace_ipc(other_module.clone()).unwrap());
2613
2614 assert!(modules.ipc.as_ref().unwrap().method("something").is_some());
2615
2616 other_module.register_method("anything", |_, _, _| "fails").unwrap();
2617 assert!(modules.replace_ipc(other_module.clone()).unwrap());
2618
2619 assert!(modules.ipc.as_ref().unwrap().method("anything").is_some());
2620 }
2621 #[test]
2622 fn test_replace_ws_method() {
2623 let mut modules =
2624 TransportRpcModules { ws: Some(create_test_module()), ..Default::default() };
2625
2626 let mut other_module = RpcModule::new(());
2627 other_module.register_method("something", |_, _, _| "fails").unwrap();
2628
2629 assert!(modules.replace_ws(other_module.clone()).unwrap());
2630
2631 assert!(modules.ws.as_ref().unwrap().method("something").is_some());
2632
2633 other_module.register_method("anything", |_, _, _| "fails").unwrap();
2634 assert!(modules.replace_ws(other_module.clone()).unwrap());
2635
2636 assert!(modules.ws.as_ref().unwrap().method("anything").is_some());
2637 }
2638
2639 #[test]
2640 fn test_replace_configured() {
2641 let mut modules = TransportRpcModules {
2642 http: Some(create_test_module()),
2643 ws: Some(create_test_module()),
2644 ipc: Some(create_test_module()),
2645 ..Default::default()
2646 };
2647 let mut other_module = RpcModule::new(());
2648 other_module.register_method("something", |_, _, _| "fails").unwrap();
2649
2650 assert!(modules.replace_configured(other_module).unwrap());
2651
2652 assert!(modules.http.as_ref().unwrap().method("something").is_some());
2654 assert!(modules.ipc.as_ref().unwrap().method("something").is_some());
2655 assert!(modules.ws.as_ref().unwrap().method("something").is_some());
2656
2657 assert!(modules.http.as_ref().unwrap().method("anything").is_some());
2658 assert!(modules.ipc.as_ref().unwrap().method("anything").is_some());
2659 assert!(modules.ws.as_ref().unwrap().method("anything").is_some());
2660 }
2661
2662 #[test]
2663 fn test_add_or_replace_if_module_configured() {
2664 let config = TransportRpcModuleConfig::default()
2666 .with_http([RethRpcModule::Eth])
2667 .with_ws([RethRpcModule::Eth]);
2668
2669 let mut http_module = RpcModule::new(());
2671 http_module.register_method("eth_existing", |_, _, _| "original").unwrap();
2672
2673 let mut ws_module = RpcModule::new(());
2675 ws_module.register_method("eth_existing", |_, _, _| "original").unwrap();
2676
2677 let ipc_module = RpcModule::new(());
2679
2680 let mut modules = TransportRpcModules {
2682 config,
2683 http: Some(http_module),
2684 ws: Some(ws_module),
2685 ipc: Some(ipc_module),
2686 };
2687
2688 let mut new_module = RpcModule::new(());
2690 new_module.register_method("eth_existing", |_, _, _| "replaced").unwrap(); new_module.register_method("eth_new", |_, _, _| "added").unwrap(); let new_methods: Methods = new_module.into();
2693
2694 let result = modules.add_or_replace_if_module_configured(RethRpcModule::Eth, new_methods);
2696 assert!(result.is_ok(), "Function should succeed");
2697
2698 let http = modules.http.as_ref().unwrap();
2700 assert!(http.method("eth_existing").is_some());
2701 assert!(http.method("eth_new").is_some());
2702
2703 let ws = modules.ws.as_ref().unwrap();
2705 assert!(ws.method("eth_existing").is_some());
2706 assert!(ws.method("eth_new").is_some());
2707
2708 let ipc = modules.ipc.as_ref().unwrap();
2710 assert!(ipc.method("eth_existing").is_none());
2711 assert!(ipc.method("eth_new").is_none());
2712 }
2713
2714 #[test]
2715 fn test_merge_if_module_configured_with_lazy_evaluation() {
2716 let config = TransportRpcModuleConfig::default().with_http([RethRpcModule::Eth]);
2718
2719 let mut modules =
2720 TransportRpcModules { config, http: Some(RpcModule::new(())), ws: None, ipc: None };
2721
2722 let mut closure_called = false;
2724
2725 let result = modules.merge_if_module_configured_with(RethRpcModule::Eth, || {
2727 closure_called = true;
2728 let mut methods = RpcModule::new(());
2729 methods.register_method("eth_test", |_, _, _| "test").unwrap();
2730 methods.into()
2731 });
2732
2733 assert!(result.is_ok());
2734 assert!(closure_called, "Closure should be called when module is configured");
2735 assert!(modules.http.as_ref().unwrap().method("eth_test").is_some());
2736
2737 closure_called = false;
2739 let result = modules.merge_if_module_configured_with(RethRpcModule::Debug, || {
2740 closure_called = true;
2741 RpcModule::new(()).into()
2742 });
2743
2744 assert!(result.is_ok());
2745 assert!(!closure_called, "Closure should NOT be called when module is not configured");
2746 }
2747}