1#![doc(
15 html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
16 html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
17 issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
18)]
19#![cfg_attr(not(test), warn(unused_crate_dependencies))]
20#![cfg_attr(docsrs, feature(doc_cfg))]
21
22use crate::{auth::AuthRpcModule, error::WsHttpSamePortError, metrics::RpcRequestMetrics};
23use alloy_network::{Ethereum, IntoWallet};
24use alloy_provider::{fillers::RecommendedFillers, Provider, ProviderBuilder};
25use core::marker::PhantomData;
26use error::{ConflictingModules, RpcError, ServerKind};
27use http::{header::AUTHORIZATION, HeaderMap};
28use jsonrpsee::{
29 core::RegisterMethodError,
30 server::{middleware::rpc::RpcServiceBuilder, AlreadyStoppedError, IdProvider, ServerHandle},
31 Methods, RpcModule,
32};
33use reth_chainspec::{ChainSpecProvider, EthereumHardforks};
34use reth_consensus::FullConsensus;
35use reth_engine_primitives::{ConsensusEngineEvent, ConsensusEngineHandle};
36use reth_evm::ConfigureEvm;
37use reth_network_api::{noop::NoopNetwork, NetworkInfo, Peers};
38use reth_payload_primitives::PayloadTypes;
39use reth_primitives_traits::{NodePrimitives, TxTy};
40use reth_rpc::{
41 AdminApi, DebugApi, EngineEthApi, EthApi, EthApiBuilder, EthBundle, MinerApi, NetApi,
42 OtterscanApi, RPCApi, RethApi, TraceApi, TxPoolApi, Web3Api,
43};
44use reth_rpc_api::servers::*;
45use reth_rpc_engine_api::RethEngineApi;
46use reth_rpc_eth_api::{
47 helpers::{
48 pending_block::PendingEnvBuilder, Call, EthApiSpec, EthTransactions, LoadPendingBlock,
49 TraceExt,
50 },
51 node::RpcNodeCoreAdapter,
52 EthApiServer, EthApiTypes, FullEthApiServer, FullEthApiTypes, RpcBlock, RpcConvert,
53 RpcConverter, RpcHeader, RpcNodeCore, RpcReceipt, RpcTransaction, RpcTxReq,
54};
55use reth_rpc_eth_types::{receipt::EthReceiptConverter, EthConfig, EthSubscriptionIdProvider};
56use reth_rpc_layer::{AuthLayer, Claims, CompressionLayer, JwtAuthValidator, JwtSecret};
57pub use reth_rpc_server_types::RethRpcModule;
58use reth_storage_api::{
59 AccountReader, BlockReader, ChangeSetReader, FullRpcProvider, NodePrimitivesProvider,
60 StateProviderFactory,
61};
62use reth_tasks::{pool::BlockingTaskGuard, Runtime};
63use reth_tokio_util::EventSender;
64use reth_transaction_pool::{noop::NoopTransactionPool, TransactionPool};
65use serde::{Deserialize, Serialize};
66use std::{
67 collections::HashMap,
68 fmt::Debug,
69 net::{Ipv4Addr, SocketAddr, SocketAddrV4},
70 time::{Duration, SystemTime, UNIX_EPOCH},
71};
72use tower_http::cors::CorsLayer;
73
74pub use cors::CorsDomainError;
75
76pub use jsonrpsee::server::ServerBuilder;
78use jsonrpsee::server::ServerConfigBuilder;
79pub use reth_ipc::server::{
80 Builder as IpcServerBuilder, RpcServiceBuilder as IpcRpcServiceBuilder,
81};
82pub use reth_rpc_server_types::{constants, RpcModuleSelection};
83pub use tower::layer::util::{Identity, Stack};
84
85pub mod auth;
87
88pub mod config;
90
91pub mod middleware;
93
94mod cors;
96
97pub mod error;
99
100pub mod eth;
102pub use eth::EthHandlers;
103
104mod metrics;
106use crate::middleware::RethRpcMiddleware;
107pub use metrics::{MeteredBatchRequestsFuture, MeteredRequestFuture, RpcRequestMetricsService};
108use reth_chain_state::{
109 CanonStateSubscriptions, ForkChoiceSubscriptions, PersistedBlockSubscriptions,
110};
111use reth_rpc::eth::sim_bundle::EthSimBundle;
112
113pub mod rate_limiter;
115
116#[derive(Debug, Clone)]
120pub struct RpcModuleBuilder<N, Provider, Pool, Network, EvmConfig, Consensus> {
121 provider: Provider,
123 pool: Pool,
125 network: Network,
127 executor: Option<Runtime>,
129 evm_config: EvmConfig,
131 consensus: Consensus,
133 _primitives: PhantomData<N>,
135}
136
137impl<N, Provider, Pool, Network, EvmConfig, Consensus>
140 RpcModuleBuilder<N, Provider, Pool, Network, EvmConfig, Consensus>
141{
142 pub const fn new(
144 provider: Provider,
145 pool: Pool,
146 network: Network,
147 executor: Runtime,
148 evm_config: EvmConfig,
149 consensus: Consensus,
150 ) -> Self {
151 Self {
152 provider,
153 pool,
154 network,
155 executor: Some(executor),
156 evm_config,
157 consensus,
158 _primitives: PhantomData,
159 }
160 }
161
162 pub fn with_provider<P>(
164 self,
165 provider: P,
166 ) -> RpcModuleBuilder<N, P, Pool, Network, EvmConfig, Consensus> {
167 let Self { pool, network, executor, evm_config, consensus, _primitives, .. } = self;
168 RpcModuleBuilder { provider, network, pool, executor, evm_config, consensus, _primitives }
169 }
170
171 pub fn with_pool<P>(
173 self,
174 pool: P,
175 ) -> RpcModuleBuilder<N, Provider, P, Network, EvmConfig, Consensus> {
176 let Self { provider, network, executor, evm_config, consensus, _primitives, .. } = self;
177 RpcModuleBuilder { provider, network, pool, executor, evm_config, consensus, _primitives }
178 }
179
180 pub fn with_noop_pool(
186 self,
187 ) -> RpcModuleBuilder<N, Provider, NoopTransactionPool, Network, EvmConfig, Consensus> {
188 let Self { provider, executor, network, evm_config, consensus, _primitives, .. } = self;
189 RpcModuleBuilder {
190 provider,
191 executor,
192 network,
193 evm_config,
194 pool: NoopTransactionPool::default(),
195 consensus,
196 _primitives,
197 }
198 }
199
200 pub fn with_network<Net>(
202 self,
203 network: Net,
204 ) -> RpcModuleBuilder<N, Provider, Pool, Net, EvmConfig, Consensus> {
205 let Self { provider, pool, executor, evm_config, consensus, _primitives, .. } = self;
206 RpcModuleBuilder { provider, network, pool, executor, evm_config, consensus, _primitives }
207 }
208
209 pub fn with_noop_network(
215 self,
216 ) -> RpcModuleBuilder<N, Provider, Pool, NoopNetwork, EvmConfig, Consensus> {
217 let Self { provider, pool, executor, evm_config, consensus, _primitives, .. } = self;
218 RpcModuleBuilder {
219 provider,
220 pool,
221 executor,
222 network: NoopNetwork::default(),
223 evm_config,
224 consensus,
225 _primitives,
226 }
227 }
228
229 pub fn with_executor(self, executor: Runtime) -> Self {
231 let Self { pool, network, provider, evm_config, consensus, _primitives, .. } = self;
232 Self {
233 provider,
234 network,
235 pool,
236 executor: Some(executor),
237 evm_config,
238 consensus,
239 _primitives,
240 }
241 }
242
243 pub fn with_evm_config<E>(
245 self,
246 evm_config: E,
247 ) -> RpcModuleBuilder<N, Provider, Pool, Network, E, Consensus> {
248 let Self { provider, pool, executor, network, consensus, _primitives, .. } = self;
249 RpcModuleBuilder { provider, network, pool, executor, evm_config, consensus, _primitives }
250 }
251
252 pub fn with_consensus<C>(
254 self,
255 consensus: C,
256 ) -> RpcModuleBuilder<N, Provider, Pool, Network, EvmConfig, C> {
257 let Self { provider, network, pool, executor, evm_config, _primitives, .. } = self;
258 RpcModuleBuilder { provider, network, pool, executor, evm_config, consensus, _primitives }
259 }
260
261 #[expect(clippy::type_complexity)]
263 pub fn eth_api_builder<ChainSpec>(
264 &self,
265 ) -> EthApiBuilder<
266 RpcNodeCoreAdapter<Provider, Pool, Network, EvmConfig>,
267 RpcConverter<Ethereum, EvmConfig, EthReceiptConverter<ChainSpec>>,
268 >
269 where
270 Provider: Clone,
271 Pool: Clone,
272 Network: Clone,
273 EvmConfig: Clone,
274 RpcNodeCoreAdapter<Provider, Pool, Network, EvmConfig>:
275 RpcNodeCore<Provider: ChainSpecProvider<ChainSpec = ChainSpec>, Evm = EvmConfig>,
276 {
277 EthApiBuilder::new(
278 self.provider.clone(),
279 self.pool.clone(),
280 self.network.clone(),
281 self.evm_config.clone(),
282 )
283 }
284
285 #[expect(clippy::type_complexity)]
291 pub fn bootstrap_eth_api<ChainSpec>(
292 &self,
293 ) -> EthApi<
294 RpcNodeCoreAdapter<Provider, Pool, Network, EvmConfig>,
295 RpcConverter<Ethereum, EvmConfig, EthReceiptConverter<ChainSpec>>,
296 >
297 where
298 Provider: Clone,
299 Pool: Clone,
300 Network: Clone,
301 EvmConfig: ConfigureEvm + Clone,
302 RpcNodeCoreAdapter<Provider, Pool, Network, EvmConfig>:
303 RpcNodeCore<Provider: ChainSpecProvider<ChainSpec = ChainSpec>, Evm = EvmConfig>,
304 RpcConverter<Ethereum, EvmConfig, EthReceiptConverter<ChainSpec>>: RpcConvert,
305 (): PendingEnvBuilder<EvmConfig>,
306 {
307 self.eth_api_builder().build()
308 }
309}
310
311impl<N, Provider, Pool, Network, EvmConfig, Consensus>
312 RpcModuleBuilder<N, Provider, Pool, Network, EvmConfig, Consensus>
313where
314 N: NodePrimitives,
315 Provider: FullRpcProvider<Block = N::Block, Receipt = N::Receipt, Header = N::BlockHeader>
316 + CanonStateSubscriptions<Primitives = N>
317 + ForkChoiceSubscriptions<Header = N::BlockHeader>
318 + PersistedBlockSubscriptions
319 + AccountReader
320 + ChangeSetReader,
321 Pool: TransactionPool + Clone + 'static,
322 Network: NetworkInfo + Peers + Clone + 'static,
323 EvmConfig: ConfigureEvm<Primitives = N> + 'static,
324 Consensus: FullConsensus<N> + Clone + 'static,
325{
326 pub fn build_with_auth_server<EthApi, Payload>(
333 self,
334 module_config: TransportRpcModuleConfig,
335 engine: impl IntoEngineApiRpcModule,
336 eth: EthApi,
337 engine_events: EventSender<ConsensusEngineEvent<N>>,
338 beacon_engine_handle: ConsensusEngineHandle<Payload>,
339 ) -> (
340 TransportRpcModules,
341 AuthRpcModule,
342 RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>,
343 )
344 where
345 EthApi: FullEthApiServer<Provider = Provider, Pool = Pool>,
346 Payload: PayloadTypes,
347 {
348 let config = module_config.config.clone().unwrap_or_default();
349
350 let mut registry = self.into_registry(config, eth, engine_events);
351 let modules = registry.create_transport_rpc_modules(module_config);
352 let auth_module = registry.create_auth_module(engine, beacon_engine_handle);
353
354 (modules, auth_module, registry)
355 }
356
357 pub fn into_registry<EthApi>(
362 self,
363 config: RpcModuleConfig,
364 eth: EthApi,
365 engine_events: EventSender<ConsensusEngineEvent<N>>,
366 ) -> RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
367 where
368 EthApi: FullEthApiServer<Provider = Provider, Pool = Pool>,
369 {
370 let Self { provider, pool, network, executor, consensus, evm_config, .. } = self;
371 let executor =
372 executor.expect("RpcModuleBuilder requires a Runtime to be set via `with_executor`");
373 RpcRegistryInner::new(
374 provider,
375 pool,
376 network,
377 executor,
378 consensus,
379 config,
380 evm_config,
381 eth,
382 engine_events,
383 )
384 }
385
386 pub fn build<EthApi>(
389 self,
390 module_config: TransportRpcModuleConfig,
391 eth: EthApi,
392 engine_events: EventSender<ConsensusEngineEvent<N>>,
393 ) -> TransportRpcModules<()>
394 where
395 EthApi: FullEthApiServer<Provider = Provider, Pool = Pool>,
396 {
397 let mut modules = TransportRpcModules::default();
398
399 if !module_config.is_empty() {
400 let TransportRpcModuleConfig { http, ws, ipc, config } = module_config.clone();
401
402 let mut registry = self.into_registry(config.unwrap_or_default(), eth, engine_events);
403
404 modules.config = module_config;
405 modules.http = registry.maybe_module(http.as_ref());
406 modules.ws = registry.maybe_module(ws.as_ref());
407 modules.ipc = registry.maybe_module(ipc.as_ref());
408 }
409
410 modules
411 }
412}
413
414impl<N: NodePrimitives> Default for RpcModuleBuilder<N, (), (), (), (), ()> {
415 fn default() -> Self {
416 Self {
417 provider: (),
418 pool: (),
419 network: (),
420 executor: None,
421 evm_config: (),
422 consensus: (),
423 _primitives: PhantomData,
424 }
425 }
426}
427
428#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize)]
430pub struct RpcModuleConfig {
431 eth: EthConfig,
433}
434
435impl RpcModuleConfig {
438 pub fn builder() -> RpcModuleConfigBuilder {
440 RpcModuleConfigBuilder::default()
441 }
442
443 pub const fn new(eth: EthConfig) -> Self {
445 Self { eth }
446 }
447
448 pub const fn eth(&self) -> &EthConfig {
450 &self.eth
451 }
452
453 pub const fn eth_mut(&mut self) -> &mut EthConfig {
455 &mut self.eth
456 }
457}
458
459#[derive(Clone, Debug, Default)]
461pub struct RpcModuleConfigBuilder {
462 eth: Option<EthConfig>,
463}
464
465impl RpcModuleConfigBuilder {
468 pub fn eth(mut self, eth: EthConfig) -> Self {
470 self.eth = Some(eth);
471 self
472 }
473
474 pub fn build(self) -> RpcModuleConfig {
476 let Self { eth } = self;
477 RpcModuleConfig { eth: eth.unwrap_or_default() }
478 }
479
480 pub const fn get_eth(&self) -> Option<&EthConfig> {
482 self.eth.as_ref()
483 }
484
485 pub const fn eth_mut(&mut self) -> &mut Option<EthConfig> {
487 &mut self.eth
488 }
489
490 pub fn eth_mut_or_default(&mut self) -> &mut EthConfig {
492 self.eth.get_or_insert_with(EthConfig::default)
493 }
494}
495
496#[derive(Debug)]
498pub struct RpcRegistryInner<Provider, Pool, Network, EthApi: EthApiTypes, EvmConfig, Consensus> {
499 provider: Provider,
500 pool: Pool,
501 network: Network,
502 executor: Runtime,
503 evm_config: EvmConfig,
504 consensus: Consensus,
505 eth: EthHandlers<EthApi>,
507 blocking_pool_guard: BlockingTaskGuard,
509 modules: HashMap<RethRpcModule, Methods>,
511 eth_config: EthConfig,
513 engine_events:
515 EventSender<ConsensusEngineEvent<<EthApi::RpcConvert as RpcConvert>::Primitives>>,
516}
517
518impl<N, Provider, Pool, Network, EthApi, EvmConfig, Consensus>
521 RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
522where
523 N: NodePrimitives,
524 Provider: StateProviderFactory
525 + CanonStateSubscriptions<Primitives = N>
526 + BlockReader<Block = N::Block, Receipt = N::Receipt>
527 + Clone
528 + Unpin
529 + 'static,
530 Pool: Send + Sync + Clone + 'static,
531 Network: Clone + 'static,
532 EthApi: FullEthApiTypes + 'static,
533 EvmConfig: ConfigureEvm<Primitives = N>,
534{
535 #[expect(clippy::too_many_arguments)]
537 pub fn new(
538 provider: Provider,
539 pool: Pool,
540 network: Network,
541 executor: Runtime,
542 consensus: Consensus,
543 config: RpcModuleConfig,
544 evm_config: EvmConfig,
545 eth_api: EthApi,
546 engine_events: EventSender<
547 ConsensusEngineEvent<<EthApi::Provider as NodePrimitivesProvider>::Primitives>,
548 >,
549 ) -> Self
550 where
551 EvmConfig: ConfigureEvm<Primitives = N>,
552 {
553 let blocking_pool_guard = BlockingTaskGuard::new(config.eth.max_tracing_requests);
554
555 let eth = EthHandlers::bootstrap(config.eth.clone(), executor.clone(), eth_api);
556
557 Self {
558 provider,
559 pool,
560 network,
561 eth,
562 executor,
563 consensus,
564 modules: Default::default(),
565 blocking_pool_guard,
566 eth_config: config.eth,
567 evm_config,
568 engine_events,
569 }
570 }
571}
572
573impl<Provider, Pool, Network, EthApi, Evm, Consensus>
574 RpcRegistryInner<Provider, Pool, Network, EthApi, Evm, Consensus>
575where
576 EthApi: EthApiTypes,
577{
578 pub const fn eth_api(&self) -> &EthApi {
580 &self.eth.api
581 }
582
583 pub const fn eth_handlers(&self) -> &EthHandlers<EthApi> {
585 &self.eth
586 }
587
588 pub const fn pool(&self) -> &Pool {
590 &self.pool
591 }
592
593 pub const fn tasks(&self) -> &Runtime {
595 &self.executor
596 }
597
598 pub const fn provider(&self) -> &Provider {
600 &self.provider
601 }
602
603 pub const fn evm_config(&self) -> &Evm {
605 &self.evm_config
606 }
607
608 pub fn methods(&self) -> Vec<Methods> {
610 self.modules.values().cloned().collect()
611 }
612
613 pub fn module(&self) -> RpcModule<()> {
615 let mut module = RpcModule::new(());
616 for methods in self.modules.values().cloned() {
617 module.merge(methods).expect("No conflicts");
618 }
619 module
620 }
621}
622
623impl<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
624 RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
625where
626 Network: NetworkInfo + Clone + 'static,
627 EthApi: EthApiTypes,
628 Provider: BlockReader + ChainSpecProvider<ChainSpec: EthereumHardforks>,
629 EvmConfig: ConfigureEvm,
630{
631 pub fn admin_api(&self) -> AdminApi<Network, Provider::ChainSpec, Pool>
633 where
634 Network: Peers,
635 Pool: TransactionPool + Clone + 'static,
636 {
637 AdminApi::new(self.network.clone(), self.provider.chain_spec(), self.pool.clone())
638 }
639
640 pub fn web3_api(&self) -> Web3Api<Network> {
642 Web3Api::new(self.network.clone())
643 }
644
645 pub fn register_admin(&mut self) -> &mut Self
647 where
648 Network: Peers,
649 Pool: TransactionPool + Clone + 'static,
650 {
651 let adminapi = self.admin_api();
652 self.modules.insert(RethRpcModule::Admin, adminapi.into_rpc().into());
653 self
654 }
655
656 pub fn register_web3(&mut self) -> &mut Self {
658 let web3api = self.web3_api();
659 self.modules.insert(RethRpcModule::Web3, web3api.into_rpc().into());
660 self
661 }
662}
663
664impl<N, Provider, Pool, Network, EthApi, EvmConfig, Consensus>
665 RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
666where
667 N: NodePrimitives,
668 Provider: FullRpcProvider<
669 Header = N::BlockHeader,
670 Block = N::Block,
671 Receipt = N::Receipt,
672 Transaction = N::SignedTx,
673 > + AccountReader
674 + ChangeSetReader
675 + CanonStateSubscriptions<Primitives = N>
676 + ForkChoiceSubscriptions<Header = N::BlockHeader>
677 + PersistedBlockSubscriptions,
678 Network: NetworkInfo + Peers + Clone + 'static,
679 EthApi: EthApiServer<
680 RpcTxReq<EthApi::NetworkTypes>,
681 RpcTransaction<EthApi::NetworkTypes>,
682 RpcBlock<EthApi::NetworkTypes>,
683 RpcReceipt<EthApi::NetworkTypes>,
684 RpcHeader<EthApi::NetworkTypes>,
685 TxTy<N>,
686 > + EthApiTypes,
687 EvmConfig: ConfigureEvm<Primitives = N> + 'static,
688{
689 pub fn register_eth(&mut self) -> &mut Self {
695 let eth_api = self.eth_api().clone();
696 self.modules.insert(RethRpcModule::Eth, eth_api.into_rpc().into());
697 self
698 }
699
700 pub fn register_ots(&mut self) -> &mut Self
706 where
707 EthApi: TraceExt + EthTransactions<Primitives = N>,
708 {
709 let otterscan_api = self.otterscan_api();
710 self.modules.insert(RethRpcModule::Ots, otterscan_api.into_rpc().into());
711 self
712 }
713
714 pub fn register_debug(&mut self) -> &mut Self
720 where
721 EthApi: EthTransactions + TraceExt,
722 {
723 let debug_api = self.debug_api();
724 self.modules.insert(RethRpcModule::Debug, debug_api.into_rpc().into());
725 self
726 }
727
728 pub fn register_trace(&mut self) -> &mut Self
734 where
735 EthApi: TraceExt,
736 {
737 let trace_api = self.trace_api();
738 self.modules.insert(RethRpcModule::Trace, trace_api.into_rpc().into());
739 self
740 }
741
742 pub fn register_net(&mut self) -> &mut Self
750 where
751 EthApi: EthApiSpec + 'static,
752 {
753 let netapi = self.net_api();
754 self.modules.insert(RethRpcModule::Net, netapi.into_rpc().into());
755 self
756 }
757
758 pub fn register_reth(&mut self) -> &mut Self {
766 let rethapi = self.reth_api();
767 self.modules.insert(RethRpcModule::Reth, rethapi.into_rpc().into());
768 self
769 }
770
771 pub fn otterscan_api(&self) -> OtterscanApi<EthApi> {
777 let eth_api = self.eth_api().clone();
778 OtterscanApi::new(eth_api)
779 }
780}
781
782impl<N, Provider, Pool, Network, EthApi, EvmConfig, Consensus>
783 RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
784where
785 N: NodePrimitives,
786 Provider: FullRpcProvider<
787 Block = N::Block,
788 Header = N::BlockHeader,
789 Transaction = N::SignedTx,
790 Receipt = N::Receipt,
791 > + AccountReader
792 + ChangeSetReader,
793 Network: NetworkInfo + Peers + Clone + 'static,
794 EthApi: EthApiTypes,
795 EvmConfig: ConfigureEvm<Primitives = N>,
796{
797 pub fn trace_api(&self) -> TraceApi<EthApi> {
803 TraceApi::new(
804 self.eth_api().clone(),
805 self.blocking_pool_guard.clone(),
806 self.eth_config.clone(),
807 )
808 }
809
810 pub fn bundle_api(&self) -> EthBundle<EthApi>
816 where
817 EthApi: EthTransactions + LoadPendingBlock + Call,
818 {
819 let eth_api = self.eth_api().clone();
820 EthBundle::new(eth_api, self.blocking_pool_guard.clone())
821 }
822
823 pub fn debug_api(&self) -> DebugApi<EthApi>
829 where
830 EthApi: FullEthApiTypes,
831 {
832 DebugApi::new(
833 self.eth_api().clone(),
834 self.blocking_pool_guard.clone(),
835 self.tasks(),
836 self.engine_events.new_listener(),
837 )
838 }
839
840 pub fn net_api(&self) -> NetApi<Network, EthApi>
846 where
847 EthApi: EthApiSpec + 'static,
848 {
849 let eth_api = self.eth_api().clone();
850 NetApi::new(self.network.clone(), eth_api)
851 }
852
853 pub fn reth_api(&self) -> RethApi<Provider, EvmConfig> {
855 RethApi::new(
856 self.provider.clone(),
857 self.evm_config.clone(),
858 self.blocking_pool_guard.clone(),
859 self.executor.clone(),
860 )
861 }
862}
863
864impl<N, Provider, Pool, Network, EthApi, EvmConfig, Consensus>
865 RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
866where
867 N: NodePrimitives,
868 Provider: FullRpcProvider<Block = N::Block>
869 + CanonStateSubscriptions<Primitives = N>
870 + ForkChoiceSubscriptions<Header = N::BlockHeader>
871 + PersistedBlockSubscriptions
872 + AccountReader
873 + ChangeSetReader,
874 Pool: TransactionPool + Clone + 'static,
875 Network: NetworkInfo + Peers + Clone + 'static,
876 EthApi: FullEthApiServer,
877 EvmConfig: ConfigureEvm<Primitives = N> + 'static,
878 Consensus: FullConsensus<N> + Clone + 'static,
879{
880 pub fn create_auth_module<Payload>(
887 &self,
888 engine_api: impl IntoEngineApiRpcModule,
889 beacon_engine_handle: ConsensusEngineHandle<Payload>,
890 ) -> AuthRpcModule
891 where
892 Payload: PayloadTypes,
893 {
894 let mut module = engine_api.into_rpc_module();
895
896 let reth_engine_api = RethEngineApi::new(beacon_engine_handle);
898 module
899 .merge(RethEngineApiServer::into_rpc(reth_engine_api).remove_context())
900 .expect("No conflicting methods");
901
902 let eth_handlers = self.eth_handlers();
904 let engine_eth = EngineEthApi::new(eth_handlers.api.clone(), eth_handlers.filter.clone());
905
906 module.merge(engine_eth.into_rpc()).expect("No conflicting methods");
907
908 AuthRpcModule { inner: module }
909 }
910
911 fn maybe_module(&mut self, config: Option<&RpcModuleSelection>) -> Option<RpcModule<()>> {
913 config.map(|config| self.module_for(config))
914 }
915
916 pub fn create_transport_rpc_modules(
920 &mut self,
921 config: TransportRpcModuleConfig,
922 ) -> TransportRpcModules<()> {
923 let mut modules = TransportRpcModules::default();
924 let http = self.maybe_module(config.http.as_ref());
925 let ws = self.maybe_module(config.ws.as_ref());
926 let ipc = self.maybe_module(config.ipc.as_ref());
927
928 modules.config = config;
929 modules.http = http;
930 modules.ws = ws;
931 modules.ipc = ipc;
932 modules
933 }
934
935 pub fn module_for(&mut self, config: &RpcModuleSelection) -> RpcModule<()> {
938 let mut module = RpcModule::new(());
939 let all_methods = self.reth_methods(config.iter_selection());
940 for methods in all_methods {
941 module.merge(methods).expect("No conflicts");
942 }
943 module
944 }
945
946 pub fn reth_methods(
955 &mut self,
956 namespaces: impl Iterator<Item = RethRpcModule>,
957 ) -> Vec<Methods> {
958 let EthHandlers { api: eth_api, filter: eth_filter, pubsub: eth_pubsub, .. } =
959 self.eth_handlers().clone();
960
961 let namespaces: Vec<_> = namespaces.collect();
963 namespaces
964 .iter()
965 .map(|namespace| {
966 self.modules
967 .entry(namespace.clone())
968 .or_insert_with(|| match namespace.clone() {
969 RethRpcModule::Admin => AdminApi::new(
970 self.network.clone(),
971 self.provider.chain_spec(),
972 self.pool.clone(),
973 )
974 .into_rpc()
975 .into(),
976 RethRpcModule::Debug => DebugApi::new(
977 eth_api.clone(),
978 self.blocking_pool_guard.clone(),
979 &self.executor,
980 self.engine_events.new_listener(),
981 )
982 .into_rpc()
983 .into(),
984 RethRpcModule::Eth => {
985 let mut module = eth_api.clone().into_rpc();
987 module.merge(eth_filter.clone().into_rpc()).expect("No conflicts");
988 module.merge(eth_pubsub.clone().into_rpc()).expect("No conflicts");
989 module
990 .merge(
991 EthBundle::new(
992 eth_api.clone(),
993 self.blocking_pool_guard.clone(),
994 )
995 .into_rpc(),
996 )
997 .expect("No conflicts");
998
999 module.into()
1000 }
1001 RethRpcModule::Net => {
1002 NetApi::new(self.network.clone(), eth_api.clone()).into_rpc().into()
1003 }
1004 RethRpcModule::Trace => TraceApi::new(
1005 eth_api.clone(),
1006 self.blocking_pool_guard.clone(),
1007 self.eth_config.clone(),
1008 )
1009 .into_rpc()
1010 .into(),
1011 RethRpcModule::Web3 => Web3Api::new(self.network.clone()).into_rpc().into(),
1012 RethRpcModule::Txpool => TxPoolApi::new(
1013 self.eth.api.pool().clone(),
1014 dyn_clone::clone(self.eth.api.converter()),
1015 )
1016 .into_rpc()
1017 .into(),
1018 RethRpcModule::Rpc => RPCApi::new(
1019 namespaces
1020 .iter()
1021 .map(|module| (module.to_string(), "1.0".to_string()))
1022 .collect(),
1023 )
1024 .into_rpc()
1025 .into(),
1026 RethRpcModule::Ots => OtterscanApi::new(eth_api.clone()).into_rpc().into(),
1027 RethRpcModule::Reth => RethApi::new(
1028 self.provider.clone(),
1029 self.evm_config.clone(),
1030 self.blocking_pool_guard.clone(),
1031 self.executor.clone(),
1032 )
1033 .into_rpc()
1034 .into(),
1035 RethRpcModule::Miner => MinerApi::default().into_rpc().into(),
1036 RethRpcModule::Mev => {
1037 EthSimBundle::new(eth_api.clone(), self.blocking_pool_guard.clone())
1038 .into_rpc()
1039 .into()
1040 }
1041 RethRpcModule::Flashbots |
1045 RethRpcModule::Testing |
1046 RethRpcModule::Other(_) => Default::default(),
1047 })
1048 .clone()
1049 })
1050 .collect::<Vec<_>>()
1051 }
1052}
1053
1054impl<Provider, Pool, Network, EthApi, EvmConfig, Consensus> Clone
1055 for RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
1056where
1057 EthApi: EthApiTypes,
1058 Provider: Clone,
1059 Pool: Clone,
1060 Network: Clone,
1061 EvmConfig: Clone,
1062 Consensus: Clone,
1063{
1064 fn clone(&self) -> Self {
1065 Self {
1066 provider: self.provider.clone(),
1067 pool: self.pool.clone(),
1068 network: self.network.clone(),
1069 executor: self.executor.clone(),
1070 evm_config: self.evm_config.clone(),
1071 consensus: self.consensus.clone(),
1072 eth: self.eth.clone(),
1073 blocking_pool_guard: self.blocking_pool_guard.clone(),
1074 modules: self.modules.clone(),
1075 eth_config: self.eth_config.clone(),
1076 engine_events: self.engine_events.clone(),
1077 }
1078 }
1079}
1080
1081#[derive(Debug)]
1093pub struct RpcServerConfig<RpcMiddleware = Identity> {
1094 http_server_config: Option<ServerConfigBuilder>,
1096 http_cors_domains: Option<String>,
1098 http_addr: Option<SocketAddr>,
1100 http_disable_compression: bool,
1102 ws_server_config: Option<ServerConfigBuilder>,
1104 ws_cors_domains: Option<String>,
1106 ws_addr: Option<SocketAddr>,
1108 ipc_server_config: Option<IpcServerBuilder<Identity, Identity>>,
1110 ipc_endpoint: Option<String>,
1112 jwt_secret: Option<JwtSecret>,
1114 rpc_middleware: RpcMiddleware,
1116}
1117
1118impl Default for RpcServerConfig<Identity> {
1121 fn default() -> Self {
1123 Self {
1124 http_server_config: None,
1125 http_cors_domains: None,
1126 http_addr: None,
1127 http_disable_compression: false,
1128 ws_server_config: None,
1129 ws_cors_domains: None,
1130 ws_addr: None,
1131 ipc_server_config: None,
1132 ipc_endpoint: None,
1133 jwt_secret: None,
1134 rpc_middleware: Default::default(),
1135 }
1136 }
1137}
1138
1139impl RpcServerConfig {
1140 pub fn http(config: ServerConfigBuilder) -> Self {
1142 Self::default().with_http(config)
1143 }
1144
1145 pub fn ws(config: ServerConfigBuilder) -> Self {
1147 Self::default().with_ws(config)
1148 }
1149
1150 pub fn ipc(config: IpcServerBuilder<Identity, Identity>) -> Self {
1152 Self::default().with_ipc(config)
1153 }
1154
1155 pub fn with_http(mut self, config: ServerConfigBuilder) -> Self {
1160 self.http_server_config =
1161 Some(config.set_id_provider(EthSubscriptionIdProvider::default()));
1162 self
1163 }
1164
1165 pub fn with_ws(mut self, config: ServerConfigBuilder) -> Self {
1170 self.ws_server_config = Some(config.set_id_provider(EthSubscriptionIdProvider::default()));
1171 self
1172 }
1173
1174 pub fn with_ipc(mut self, config: IpcServerBuilder<Identity, Identity>) -> Self {
1179 self.ipc_server_config = Some(config.set_id_provider(EthSubscriptionIdProvider::default()));
1180 self
1181 }
1182}
1183
1184impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
1185 pub fn set_rpc_middleware<T>(self, rpc_middleware: T) -> RpcServerConfig<T> {
1187 RpcServerConfig {
1188 http_server_config: self.http_server_config,
1189 http_cors_domains: self.http_cors_domains,
1190 http_addr: self.http_addr,
1191 http_disable_compression: self.http_disable_compression,
1192 ws_server_config: self.ws_server_config,
1193 ws_cors_domains: self.ws_cors_domains,
1194 ws_addr: self.ws_addr,
1195 ipc_server_config: self.ipc_server_config,
1196 ipc_endpoint: self.ipc_endpoint,
1197 jwt_secret: self.jwt_secret,
1198 rpc_middleware,
1199 }
1200 }
1201
1202 pub fn with_cors(self, cors_domain: Option<String>) -> Self {
1204 self.with_http_cors(cors_domain.clone()).with_ws_cors(cors_domain)
1205 }
1206
1207 pub fn with_ws_cors(mut self, cors_domain: Option<String>) -> Self {
1209 self.ws_cors_domains = cors_domain;
1210 self
1211 }
1212
1213 pub const fn with_http_disable_compression(mut self, http_disable_compression: bool) -> Self {
1215 self.http_disable_compression = http_disable_compression;
1216 self
1217 }
1218
1219 pub fn with_http_cors(mut self, cors_domain: Option<String>) -> Self {
1221 self.http_cors_domains = cors_domain;
1222 self
1223 }
1224
1225 pub const fn with_http_address(mut self, addr: SocketAddr) -> Self {
1230 self.http_addr = Some(addr);
1231 self
1232 }
1233
1234 pub const fn with_ws_address(mut self, addr: SocketAddr) -> Self {
1239 self.ws_addr = Some(addr);
1240 self
1241 }
1242
1243 pub fn with_id_provider<I>(mut self, id_provider: I) -> Self
1247 where
1248 I: IdProvider + Clone + 'static,
1249 {
1250 if let Some(config) = self.http_server_config {
1251 self.http_server_config = Some(config.set_id_provider(id_provider.clone()));
1252 }
1253 if let Some(config) = self.ws_server_config {
1254 self.ws_server_config = Some(config.set_id_provider(id_provider.clone()));
1255 }
1256 if let Some(ipc) = self.ipc_server_config {
1257 self.ipc_server_config = Some(ipc.set_id_provider(id_provider));
1258 }
1259
1260 self
1261 }
1262
1263 pub fn with_ipc_endpoint(mut self, path: impl Into<String>) -> Self {
1267 self.ipc_endpoint = Some(path.into());
1268 self
1269 }
1270
1271 pub const fn with_jwt_secret(mut self, secret: Option<JwtSecret>) -> Self {
1273 self.jwt_secret = secret;
1274 self
1275 }
1276
1277 pub fn with_tokio_runtime(mut self, tokio_runtime: Option<tokio::runtime::Handle>) -> Self {
1279 let Some(tokio_runtime) = tokio_runtime else { return self };
1280 if let Some(http_server_config) = self.http_server_config {
1281 self.http_server_config =
1282 Some(http_server_config.custom_tokio_runtime(tokio_runtime.clone()));
1283 }
1284 if let Some(ws_server_config) = self.ws_server_config {
1285 self.ws_server_config =
1286 Some(ws_server_config.custom_tokio_runtime(tokio_runtime.clone()));
1287 }
1288 if let Some(ipc_server_config) = self.ipc_server_config {
1289 self.ipc_server_config = Some(ipc_server_config.custom_tokio_runtime(tokio_runtime));
1290 }
1291 self
1292 }
1293
1294 pub const fn has_server(&self) -> bool {
1298 self.http_server_config.is_some() ||
1299 self.ws_server_config.is_some() ||
1300 self.ipc_server_config.is_some()
1301 }
1302
1303 pub const fn http_address(&self) -> Option<SocketAddr> {
1305 self.http_addr
1306 }
1307
1308 pub const fn ws_address(&self) -> Option<SocketAddr> {
1310 self.ws_addr
1311 }
1312
1313 pub fn ipc_endpoint(&self) -> Option<String> {
1315 self.ipc_endpoint.clone()
1316 }
1317
1318 fn maybe_cors_layer(cors: Option<String>) -> Result<Option<CorsLayer>, CorsDomainError> {
1320 cors.as_deref().map(cors::create_cors_layer).transpose()
1321 }
1322
1323 fn maybe_jwt_layer(jwt_secret: Option<JwtSecret>) -> Option<AuthLayer<JwtAuthValidator>> {
1325 jwt_secret.map(|secret| AuthLayer::new(JwtAuthValidator::new(secret)))
1326 }
1327
1328 fn maybe_compression_layer(disable_compression: bool) -> Option<CompressionLayer> {
1331 if disable_compression {
1332 None
1333 } else {
1334 Some(CompressionLayer::new())
1335 }
1336 }
1337
1338 pub async fn start(self, modules: &TransportRpcModules) -> Result<RpcServerHandle, RpcError>
1344 where
1345 RpcMiddleware: RethRpcMiddleware,
1346 {
1347 let mut http_handle = None;
1348 let mut ws_handle = None;
1349 let mut ipc_handle = None;
1350
1351 let http_socket_addr = self.http_addr.unwrap_or(SocketAddr::V4(SocketAddrV4::new(
1352 Ipv4Addr::LOCALHOST,
1353 constants::DEFAULT_HTTP_RPC_PORT,
1354 )));
1355
1356 let ws_socket_addr = self.ws_addr.unwrap_or(SocketAddr::V4(SocketAddrV4::new(
1357 Ipv4Addr::LOCALHOST,
1358 constants::DEFAULT_WS_RPC_PORT,
1359 )));
1360
1361 let metrics = modules.ipc.as_ref().map(RpcRequestMetrics::ipc).unwrap_or_default();
1362 let ipc_path =
1363 self.ipc_endpoint.clone().unwrap_or_else(|| constants::DEFAULT_IPC_ENDPOINT.into());
1364
1365 if let Some(builder) = self.ipc_server_config {
1366 let ipc = builder
1367 .set_rpc_middleware(IpcRpcServiceBuilder::new().layer(metrics))
1368 .build(ipc_path);
1369 ipc_handle = Some(ipc.start(modules.ipc.clone().expect("ipc server error")).await?);
1370 }
1371
1372 if self.http_addr == self.ws_addr &&
1374 self.http_server_config.is_some() &&
1375 self.ws_server_config.is_some()
1376 {
1377 let cors = match (self.ws_cors_domains.as_ref(), self.http_cors_domains.as_ref()) {
1378 (Some(ws_cors), Some(http_cors)) => {
1379 if ws_cors.trim() != http_cors.trim() {
1380 return Err(WsHttpSamePortError::ConflictingCorsDomains {
1381 http_cors_domains: Some(http_cors.clone()),
1382 ws_cors_domains: Some(ws_cors.clone()),
1383 }
1384 .into());
1385 }
1386 Some(ws_cors)
1387 }
1388 (a, b) => a.or(b),
1389 }
1390 .cloned();
1391
1392 modules.config.ensure_ws_http_identical()?;
1394
1395 if let Some(config) = self.http_server_config {
1396 let server = ServerBuilder::new()
1397 .set_http_middleware(
1398 tower::ServiceBuilder::new()
1399 .option_layer(Self::maybe_cors_layer(cors)?)
1400 .option_layer(Self::maybe_jwt_layer(self.jwt_secret))
1401 .option_layer(Self::maybe_compression_layer(
1402 self.http_disable_compression,
1403 )),
1404 )
1405 .set_rpc_middleware(
1406 RpcServiceBuilder::default()
1407 .layer(
1408 modules
1409 .http
1410 .as_ref()
1411 .or(modules.ws.as_ref())
1412 .map(RpcRequestMetrics::same_port)
1413 .unwrap_or_default(),
1414 )
1415 .layer(self.rpc_middleware.clone()),
1416 )
1417 .set_config(config.build())
1418 .build(http_socket_addr)
1419 .await
1420 .map_err(|err| {
1421 RpcError::server_error(err, ServerKind::WsHttp(http_socket_addr))
1422 })?;
1423 let addr = server.local_addr().map_err(|err| {
1424 RpcError::server_error(err, ServerKind::WsHttp(http_socket_addr))
1425 })?;
1426 if let Some(module) = modules.http.as_ref().or(modules.ws.as_ref()) {
1427 let handle = server.start(module.clone());
1428 http_handle = Some(handle.clone());
1429 ws_handle = Some(handle);
1430 }
1431 return Ok(RpcServerHandle {
1432 http_local_addr: Some(addr),
1433 ws_local_addr: Some(addr),
1434 http: http_handle,
1435 ws: ws_handle,
1436 ipc_endpoint: self.ipc_endpoint.clone(),
1437 ipc: ipc_handle,
1438 jwt_secret: self.jwt_secret,
1439 });
1440 }
1441 }
1442
1443 let mut ws_local_addr = None;
1444 let mut ws_server = None;
1445 let mut http_local_addr = None;
1446 let mut http_server = None;
1447
1448 if let Some(config) = self.ws_server_config {
1449 let server = ServerBuilder::new()
1450 .set_config(config.ws_only().build())
1451 .set_http_middleware(
1452 tower::ServiceBuilder::new()
1453 .option_layer(Self::maybe_cors_layer(self.ws_cors_domains.clone())?)
1454 .option_layer(Self::maybe_jwt_layer(self.jwt_secret)),
1455 )
1456 .set_rpc_middleware(
1457 RpcServiceBuilder::default()
1458 .layer(modules.ws.as_ref().map(RpcRequestMetrics::ws).unwrap_or_default())
1459 .layer(self.rpc_middleware.clone()),
1460 )
1461 .build(ws_socket_addr)
1462 .await
1463 .map_err(|err| RpcError::server_error(err, ServerKind::WS(ws_socket_addr)))?;
1464
1465 let addr = server
1466 .local_addr()
1467 .map_err(|err| RpcError::server_error(err, ServerKind::WS(ws_socket_addr)))?;
1468
1469 ws_local_addr = Some(addr);
1470 ws_server = Some(server);
1471 }
1472
1473 if let Some(config) = self.http_server_config {
1474 let server = ServerBuilder::new()
1475 .set_config(config.http_only().build())
1476 .set_http_middleware(
1477 tower::ServiceBuilder::new()
1478 .option_layer(Self::maybe_cors_layer(self.http_cors_domains.clone())?)
1479 .option_layer(Self::maybe_jwt_layer(self.jwt_secret))
1480 .option_layer(Self::maybe_compression_layer(self.http_disable_compression)),
1481 )
1482 .set_rpc_middleware(
1483 RpcServiceBuilder::default()
1484 .layer(
1485 modules.http.as_ref().map(RpcRequestMetrics::http).unwrap_or_default(),
1486 )
1487 .layer(self.rpc_middleware.clone()),
1488 )
1489 .build(http_socket_addr)
1490 .await
1491 .map_err(|err| RpcError::server_error(err, ServerKind::Http(http_socket_addr)))?;
1492 let local_addr = server
1493 .local_addr()
1494 .map_err(|err| RpcError::server_error(err, ServerKind::Http(http_socket_addr)))?;
1495 http_local_addr = Some(local_addr);
1496 http_server = Some(server);
1497 }
1498
1499 http_handle = http_server
1500 .map(|http_server| http_server.start(modules.http.clone().expect("http server error")));
1501 ws_handle = ws_server
1502 .map(|ws_server| ws_server.start(modules.ws.clone().expect("ws server error")));
1503 Ok(RpcServerHandle {
1504 http_local_addr,
1505 ws_local_addr,
1506 http: http_handle,
1507 ws: ws_handle,
1508 ipc_endpoint: self.ipc_endpoint.clone(),
1509 ipc: ipc_handle,
1510 jwt_secret: self.jwt_secret,
1511 })
1512 }
1513}
1514
1515#[derive(Debug, Clone, Default, Eq, PartialEq)]
1527pub struct TransportRpcModuleConfig {
1528 http: Option<RpcModuleSelection>,
1530 ws: Option<RpcModuleSelection>,
1532 ipc: Option<RpcModuleSelection>,
1534 config: Option<RpcModuleConfig>,
1536}
1537
1538impl TransportRpcModuleConfig {
1541 pub fn set_http(http: impl Into<RpcModuleSelection>) -> Self {
1543 Self::default().with_http(http)
1544 }
1545
1546 pub fn set_ws(ws: impl Into<RpcModuleSelection>) -> Self {
1548 Self::default().with_ws(ws)
1549 }
1550
1551 pub fn set_ipc(ipc: impl Into<RpcModuleSelection>) -> Self {
1553 Self::default().with_ipc(ipc)
1554 }
1555
1556 pub fn with_http(mut self, http: impl Into<RpcModuleSelection>) -> Self {
1558 self.http = Some(http.into());
1559 self
1560 }
1561
1562 pub fn with_ws(mut self, ws: impl Into<RpcModuleSelection>) -> Self {
1564 self.ws = Some(ws.into());
1565 self
1566 }
1567
1568 pub fn with_ipc(mut self, ipc: impl Into<RpcModuleSelection>) -> Self {
1570 self.ipc = Some(ipc.into());
1571 self
1572 }
1573
1574 pub fn with_config(mut self, config: RpcModuleConfig) -> Self {
1576 self.config = Some(config);
1577 self
1578 }
1579
1580 pub const fn http_mut(&mut self) -> &mut Option<RpcModuleSelection> {
1582 &mut self.http
1583 }
1584
1585 pub const fn ws_mut(&mut self) -> &mut Option<RpcModuleSelection> {
1587 &mut self.ws
1588 }
1589
1590 pub const fn ipc_mut(&mut self) -> &mut Option<RpcModuleSelection> {
1592 &mut self.ipc
1593 }
1594
1595 pub const fn config_mut(&mut self) -> &mut Option<RpcModuleConfig> {
1597 &mut self.config
1598 }
1599
1600 pub const fn is_empty(&self) -> bool {
1602 self.http.is_none() && self.ws.is_none() && self.ipc.is_none()
1603 }
1604
1605 pub const fn http(&self) -> Option<&RpcModuleSelection> {
1607 self.http.as_ref()
1608 }
1609
1610 pub const fn ws(&self) -> Option<&RpcModuleSelection> {
1612 self.ws.as_ref()
1613 }
1614
1615 pub const fn ipc(&self) -> Option<&RpcModuleSelection> {
1617 self.ipc.as_ref()
1618 }
1619
1620 pub const fn config(&self) -> Option<&RpcModuleConfig> {
1622 self.config.as_ref()
1623 }
1624
1625 pub fn contains_any(&self, module: &RethRpcModule) -> bool {
1627 self.contains_http(module) || self.contains_ws(module) || self.contains_ipc(module)
1628 }
1629
1630 pub fn contains_http(&self, module: &RethRpcModule) -> bool {
1632 self.http.as_ref().is_some_and(|http| http.contains(module))
1633 }
1634
1635 pub fn contains_ws(&self, module: &RethRpcModule) -> bool {
1637 self.ws.as_ref().is_some_and(|ws| ws.contains(module))
1638 }
1639
1640 pub fn contains_ipc(&self, module: &RethRpcModule) -> bool {
1642 self.ipc.as_ref().is_some_and(|ipc| ipc.contains(module))
1643 }
1644
1645 fn ensure_ws_http_identical(&self) -> Result<(), WsHttpSamePortError> {
1648 if RpcModuleSelection::are_identical(self.http.as_ref(), self.ws.as_ref()) {
1649 Ok(())
1650 } else {
1651 let http_modules =
1652 self.http.as_ref().map(RpcModuleSelection::to_selection).unwrap_or_default();
1653 let ws_modules =
1654 self.ws.as_ref().map(RpcModuleSelection::to_selection).unwrap_or_default();
1655
1656 let http_not_ws = http_modules.difference(&ws_modules).cloned().collect();
1657 let ws_not_http = ws_modules.difference(&http_modules).cloned().collect();
1658 let overlap = http_modules.intersection(&ws_modules).cloned().collect();
1659
1660 Err(WsHttpSamePortError::ConflictingModules(Box::new(ConflictingModules {
1661 overlap,
1662 http_not_ws,
1663 ws_not_http,
1664 })))
1665 }
1666 }
1667}
1668
1669#[derive(Debug, Clone, Default)]
1671pub struct TransportRpcModules<Context = ()> {
1672 config: TransportRpcModuleConfig,
1674 http: Option<RpcModule<Context>>,
1676 ws: Option<RpcModule<Context>>,
1678 ipc: Option<RpcModule<Context>>,
1680}
1681
1682impl TransportRpcModules {
1685 pub fn with_config(mut self, config: TransportRpcModuleConfig) -> Self {
1688 self.config = config;
1689 self
1690 }
1691
1692 pub fn with_http(mut self, http: RpcModule<()>) -> Self {
1695 self.http = Some(http);
1696 self
1697 }
1698
1699 pub fn with_ws(mut self, ws: RpcModule<()>) -> Self {
1702 self.ws = Some(ws);
1703 self
1704 }
1705
1706 pub fn with_ipc(mut self, ipc: RpcModule<()>) -> Self {
1709 self.ipc = Some(ipc);
1710 self
1711 }
1712
1713 pub const fn module_config(&self) -> &TransportRpcModuleConfig {
1715 &self.config
1716 }
1717
1718 pub fn merge_if_module_configured(
1723 &mut self,
1724 module: RethRpcModule,
1725 other: impl Into<Methods>,
1726 ) -> Result<(), RegisterMethodError> {
1727 let other = other.into();
1728 if self.module_config().contains_http(&module) {
1729 self.merge_http(other.clone())?;
1730 }
1731 if self.module_config().contains_ws(&module) {
1732 self.merge_ws(other.clone())?;
1733 }
1734 if self.module_config().contains_ipc(&module) {
1735 self.merge_ipc(other)?;
1736 }
1737
1738 Ok(())
1739 }
1740
1741 pub fn merge_if_module_configured_with<F>(
1748 &mut self,
1749 module: RethRpcModule,
1750 f: F,
1751 ) -> Result<(), RegisterMethodError>
1752 where
1753 F: FnOnce() -> Methods,
1754 {
1755 if !self.module_config().contains_any(&module) {
1757 return Ok(());
1758 }
1759 self.merge_if_module_configured(module, f())
1760 }
1761
1762 pub fn merge_http(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
1768 if let Some(ref mut http) = self.http {
1769 return http.merge(other.into()).map(|_| true)
1770 }
1771 Ok(false)
1772 }
1773
1774 pub fn merge_ws(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
1780 if let Some(ref mut ws) = self.ws {
1781 return ws.merge(other.into()).map(|_| true)
1782 }
1783 Ok(false)
1784 }
1785
1786 pub fn merge_ipc(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
1792 if let Some(ref mut ipc) = self.ipc {
1793 return ipc.merge(other.into()).map(|_| true)
1794 }
1795 Ok(false)
1796 }
1797
1798 pub fn merge_configured(
1802 &mut self,
1803 other: impl Into<Methods>,
1804 ) -> Result<(), RegisterMethodError> {
1805 let other = other.into();
1806 self.merge_http(other.clone())?;
1807 self.merge_ws(other.clone())?;
1808 self.merge_ipc(other)?;
1809 Ok(())
1810 }
1811
1812 pub fn methods_by_module(&self, module: RethRpcModule) -> Methods {
1816 self.methods_by(|name| name.starts_with(module.as_str()))
1817 }
1818
1819 pub fn methods_by<F>(&self, mut filter: F) -> Methods
1823 where
1824 F: FnMut(&str) -> bool,
1825 {
1826 let mut methods = Methods::new();
1827
1828 let mut f =
1830 |name: &str, mm: &Methods| filter(name) && !mm.method_names().any(|m| m == name);
1831
1832 if let Some(m) = self.http_methods(|name| f(name, &methods)) {
1833 let _ = methods.merge(m);
1834 }
1835 if let Some(m) = self.ws_methods(|name| f(name, &methods)) {
1836 let _ = methods.merge(m);
1837 }
1838 if let Some(m) = self.ipc_methods(|name| f(name, &methods)) {
1839 let _ = methods.merge(m);
1840 }
1841 methods
1842 }
1843
1844 pub fn http_methods<F>(&self, filter: F) -> Option<Methods>
1848 where
1849 F: FnMut(&str) -> bool,
1850 {
1851 self.http.as_ref().map(|module| methods_by(module, filter))
1852 }
1853
1854 pub fn ws_methods<F>(&self, filter: F) -> Option<Methods>
1858 where
1859 F: FnMut(&str) -> bool,
1860 {
1861 self.ws.as_ref().map(|module| methods_by(module, filter))
1862 }
1863
1864 pub fn ipc_methods<F>(&self, filter: F) -> Option<Methods>
1868 where
1869 F: FnMut(&str) -> bool,
1870 {
1871 self.ipc.as_ref().map(|module| methods_by(module, filter))
1872 }
1873
1874 pub fn remove_http_method(&mut self, method_name: &'static str) -> bool {
1882 if let Some(http_module) = &mut self.http {
1883 http_module.remove_method(method_name).is_some()
1884 } else {
1885 false
1886 }
1887 }
1888
1889 pub fn remove_http_methods(&mut self, methods: impl IntoIterator<Item = &'static str>) {
1891 for name in methods {
1892 self.remove_http_method(name);
1893 }
1894 }
1895
1896 pub fn remove_ws_method(&mut self, method_name: &'static str) -> bool {
1904 if let Some(ws_module) = &mut self.ws {
1905 ws_module.remove_method(method_name).is_some()
1906 } else {
1907 false
1908 }
1909 }
1910
1911 pub fn remove_ws_methods(&mut self, methods: impl IntoIterator<Item = &'static str>) {
1913 for name in methods {
1914 self.remove_ws_method(name);
1915 }
1916 }
1917
1918 pub fn remove_ipc_method(&mut self, method_name: &'static str) -> bool {
1926 if let Some(ipc_module) = &mut self.ipc {
1927 ipc_module.remove_method(method_name).is_some()
1928 } else {
1929 false
1930 }
1931 }
1932
1933 pub fn remove_ipc_methods(&mut self, methods: impl IntoIterator<Item = &'static str>) {
1935 for name in methods {
1936 self.remove_ipc_method(name);
1937 }
1938 }
1939
1940 pub fn remove_method_from_configured(&mut self, method_name: &'static str) -> bool {
1944 let http_removed = self.remove_http_method(method_name);
1945 let ws_removed = self.remove_ws_method(method_name);
1946 let ipc_removed = self.remove_ipc_method(method_name);
1947
1948 http_removed || ws_removed || ipc_removed
1949 }
1950
1951 pub fn rename(
1955 &mut self,
1956 old_name: &'static str,
1957 new_method: impl Into<Methods>,
1958 ) -> Result<(), RegisterMethodError> {
1959 self.remove_method_from_configured(old_name);
1961
1962 self.merge_configured(new_method)
1964 }
1965
1966 pub fn replace_http(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
1973 let other = other.into();
1974 self.remove_http_methods(other.method_names());
1975 self.merge_http(other)
1976 }
1977
1978 pub fn replace_ipc(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
1985 let other = other.into();
1986 self.remove_ipc_methods(other.method_names());
1987 self.merge_ipc(other)
1988 }
1989
1990 pub fn replace_ws(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
1997 let other = other.into();
1998 self.remove_ws_methods(other.method_names());
1999 self.merge_ws(other)
2000 }
2001
2002 pub fn replace_configured(
2006 &mut self,
2007 other: impl Into<Methods>,
2008 ) -> Result<bool, RegisterMethodError> {
2009 let other = other.into();
2010 self.replace_http(other.clone())?;
2011 self.replace_ws(other.clone())?;
2012 self.replace_ipc(other)?;
2013 Ok(true)
2014 }
2015
2016 pub fn add_or_replace_http(
2020 &mut self,
2021 other: impl Into<Methods>,
2022 ) -> Result<bool, RegisterMethodError> {
2023 let other = other.into();
2024 self.remove_http_methods(other.method_names());
2025 self.merge_http(other)
2026 }
2027
2028 pub fn add_or_replace_ws(
2032 &mut self,
2033 other: impl Into<Methods>,
2034 ) -> Result<bool, RegisterMethodError> {
2035 let other = other.into();
2036 self.remove_ws_methods(other.method_names());
2037 self.merge_ws(other)
2038 }
2039
2040 pub fn add_or_replace_ipc(
2044 &mut self,
2045 other: impl Into<Methods>,
2046 ) -> Result<bool, RegisterMethodError> {
2047 let other = other.into();
2048 self.remove_ipc_methods(other.method_names());
2049 self.merge_ipc(other)
2050 }
2051
2052 pub fn add_or_replace_configured(
2054 &mut self,
2055 other: impl Into<Methods>,
2056 ) -> Result<(), RegisterMethodError> {
2057 let other = other.into();
2058 self.add_or_replace_http(other.clone())?;
2059 self.add_or_replace_ws(other.clone())?;
2060 self.add_or_replace_ipc(other)?;
2061 Ok(())
2062 }
2063 pub fn add_or_replace_if_module_configured(
2066 &mut self,
2067 module: RethRpcModule,
2068 other: impl Into<Methods>,
2069 ) -> Result<(), RegisterMethodError> {
2070 let other = other.into();
2071 if self.module_config().contains_http(&module) {
2072 self.add_or_replace_http(other.clone())?;
2073 }
2074 if self.module_config().contains_ws(&module) {
2075 self.add_or_replace_ws(other.clone())?;
2076 }
2077 if self.module_config().contains_ipc(&module) {
2078 self.add_or_replace_ipc(other)?;
2079 }
2080 Ok(())
2081 }
2082}
2083
2084fn methods_by<T, F>(module: &RpcModule<T>, mut filter: F) -> Methods
2086where
2087 F: FnMut(&str) -> bool,
2088{
2089 let mut methods = Methods::new();
2090 let method_names = module.method_names().filter(|name| filter(name));
2091
2092 for name in method_names {
2093 if let Some(matched_method) = module.method(name).cloned() {
2094 let _ = methods.verify_and_insert(name, matched_method);
2095 }
2096 }
2097
2098 methods
2099}
2100
2101#[derive(Clone, Debug)]
2106#[must_use = "Server stops if dropped"]
2107pub struct RpcServerHandle {
2108 http_local_addr: Option<SocketAddr>,
2110 ws_local_addr: Option<SocketAddr>,
2111 http: Option<ServerHandle>,
2112 ws: Option<ServerHandle>,
2113 ipc_endpoint: Option<String>,
2114 ipc: Option<jsonrpsee::server::ServerHandle>,
2115 jwt_secret: Option<JwtSecret>,
2116}
2117
2118impl RpcServerHandle {
2121 fn bearer_token(&self) -> Option<String> {
2123 self.jwt_secret.as_ref().map(|secret| {
2124 format!(
2125 "Bearer {}",
2126 secret
2127 .encode(&Claims {
2128 iat: (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() +
2129 Duration::from_secs(60))
2130 .as_secs(),
2131 exp: None,
2132 })
2133 .unwrap()
2134 )
2135 })
2136 }
2137 pub const fn http_local_addr(&self) -> Option<SocketAddr> {
2139 self.http_local_addr
2140 }
2141
2142 pub const fn ws_local_addr(&self) -> Option<SocketAddr> {
2144 self.ws_local_addr
2145 }
2146
2147 pub fn stop(self) -> Result<(), AlreadyStoppedError> {
2149 if let Some(handle) = self.http {
2150 handle.stop()?
2151 }
2152
2153 if let Some(handle) = self.ws {
2154 handle.stop()?
2155 }
2156
2157 if let Some(handle) = self.ipc {
2158 handle.stop()?
2159 }
2160
2161 Ok(())
2162 }
2163
2164 pub fn ipc_endpoint(&self) -> Option<String> {
2166 self.ipc_endpoint.clone()
2167 }
2168
2169 pub fn http_url(&self) -> Option<String> {
2171 self.http_local_addr.map(|addr| format!("http://{addr}"))
2172 }
2173
2174 pub fn ws_url(&self) -> Option<String> {
2176 self.ws_local_addr.map(|addr| format!("ws://{addr}"))
2177 }
2178
2179 pub fn http_client(&self) -> Option<jsonrpsee::http_client::HttpClient> {
2181 let url = self.http_url()?;
2182
2183 let client = if let Some(token) = self.bearer_token() {
2184 jsonrpsee::http_client::HttpClientBuilder::default()
2185 .set_headers(HeaderMap::from_iter([(AUTHORIZATION, token.parse().unwrap())]))
2186 .build(url)
2187 } else {
2188 jsonrpsee::http_client::HttpClientBuilder::default().build(url)
2189 };
2190
2191 client.expect("failed to create http client").into()
2192 }
2193
2194 pub async fn ws_client(&self) -> Option<jsonrpsee::ws_client::WsClient> {
2196 let url = self.ws_url()?;
2197 let mut builder = jsonrpsee::ws_client::WsClientBuilder::default();
2198
2199 if let Some(token) = self.bearer_token() {
2200 let headers = HeaderMap::from_iter([(AUTHORIZATION, token.parse().unwrap())]);
2201 builder = builder.set_headers(headers);
2202 }
2203
2204 let client = builder.build(url).await.expect("failed to create ws client");
2205 Some(client)
2206 }
2207
2208 pub fn eth_http_provider(
2210 &self,
2211 ) -> Option<impl Provider<alloy_network::Ethereum> + Clone + Unpin + 'static> {
2212 self.new_http_provider_for()
2213 }
2214
2215 pub fn eth_http_provider_with_wallet<W>(
2218 &self,
2219 wallet: W,
2220 ) -> Option<impl Provider<alloy_network::Ethereum> + Clone + Unpin + 'static>
2221 where
2222 W: IntoWallet<alloy_network::Ethereum, NetworkWallet: Clone + Unpin + 'static>,
2223 {
2224 let rpc_url = self.http_url()?;
2225 let provider =
2226 ProviderBuilder::new().wallet(wallet).connect_http(rpc_url.parse().expect("valid url"));
2227 Some(provider)
2228 }
2229
2230 pub fn new_http_provider_for<N>(&self) -> Option<impl Provider<N> + Clone + Unpin + 'static>
2235 where
2236 N: RecommendedFillers<RecommendedFillers: Unpin>,
2237 {
2238 let rpc_url = self.http_url()?;
2239 let provider = ProviderBuilder::default()
2240 .with_recommended_fillers()
2241 .connect_http(rpc_url.parse().expect("valid url"));
2242 Some(provider)
2243 }
2244
2245 pub async fn eth_ws_provider(
2247 &self,
2248 ) -> Option<impl Provider<alloy_network::Ethereum> + Clone + Unpin + 'static> {
2249 self.new_ws_provider_for().await
2250 }
2251
2252 pub async fn eth_ws_provider_with_wallet<W>(
2255 &self,
2256 wallet: W,
2257 ) -> Option<impl Provider<alloy_network::Ethereum> + Clone + Unpin + 'static>
2258 where
2259 W: IntoWallet<alloy_network::Ethereum, NetworkWallet: Clone + Unpin + 'static>,
2260 {
2261 let rpc_url = self.ws_url()?;
2262 let provider = ProviderBuilder::new()
2263 .wallet(wallet)
2264 .connect(&rpc_url)
2265 .await
2266 .expect("failed to create ws client");
2267 Some(provider)
2268 }
2269
2270 pub async fn new_ws_provider_for<N>(&self) -> Option<impl Provider<N> + Clone + Unpin + 'static>
2275 where
2276 N: RecommendedFillers<RecommendedFillers: Unpin>,
2277 {
2278 let rpc_url = self.ws_url()?;
2279 let provider = ProviderBuilder::default()
2280 .with_recommended_fillers()
2281 .connect(&rpc_url)
2282 .await
2283 .expect("failed to create ws client");
2284 Some(provider)
2285 }
2286
2287 pub async fn eth_ipc_provider(
2289 &self,
2290 ) -> Option<impl Provider<alloy_network::Ethereum> + Clone + Unpin + 'static> {
2291 self.new_ipc_provider_for().await
2292 }
2293
2294 pub async fn new_ipc_provider_for<N>(
2299 &self,
2300 ) -> Option<impl Provider<N> + Clone + Unpin + 'static>
2301 where
2302 N: RecommendedFillers<RecommendedFillers: Unpin>,
2303 {
2304 let rpc_url = self.ipc_endpoint()?;
2305 let provider = ProviderBuilder::default()
2306 .with_recommended_fillers()
2307 .connect(&rpc_url)
2308 .await
2309 .expect("failed to create ipc client");
2310 Some(provider)
2311 }
2312}
2313
2314#[cfg(test)]
2315mod tests {
2316 use super::*;
2317
2318 #[test]
2319 fn parse_eth_call_bundle_selection() {
2320 let selection = "eth,admin,debug".parse::<RpcModuleSelection>().unwrap();
2321 assert_eq!(
2322 selection,
2323 RpcModuleSelection::Selection(
2324 [RethRpcModule::Eth, RethRpcModule::Admin, RethRpcModule::Debug,].into()
2325 )
2326 );
2327 }
2328
2329 #[test]
2330 fn parse_rpc_module_selection() {
2331 let selection = "all".parse::<RpcModuleSelection>().unwrap();
2332 assert_eq!(selection, RpcModuleSelection::All);
2333 }
2334
2335 #[test]
2336 fn parse_rpc_module_selection_none() {
2337 let selection = "none".parse::<RpcModuleSelection>().unwrap();
2338 assert_eq!(selection, RpcModuleSelection::Selection(Default::default()));
2339 }
2340
2341 #[test]
2342 fn parse_rpc_unique_module_selection() {
2343 let selection = "eth,admin,eth,net".parse::<RpcModuleSelection>().unwrap();
2344 assert_eq!(
2345 selection,
2346 RpcModuleSelection::Selection(
2347 [RethRpcModule::Eth, RethRpcModule::Admin, RethRpcModule::Net,].into()
2348 )
2349 );
2350 }
2351
2352 #[test]
2353 fn identical_selection() {
2354 assert!(RpcModuleSelection::are_identical(
2355 Some(&RpcModuleSelection::All),
2356 Some(&RpcModuleSelection::All),
2357 ));
2358 assert!(!RpcModuleSelection::are_identical(
2359 Some(&RpcModuleSelection::All),
2360 Some(&RpcModuleSelection::Standard),
2361 ));
2362 assert!(RpcModuleSelection::are_identical(
2363 Some(&RpcModuleSelection::Selection(RpcModuleSelection::Standard.to_selection())),
2364 Some(&RpcModuleSelection::Standard),
2365 ));
2366 assert!(RpcModuleSelection::are_identical(
2367 Some(&RpcModuleSelection::Selection([RethRpcModule::Eth].into())),
2368 Some(&RpcModuleSelection::Selection([RethRpcModule::Eth].into())),
2369 ));
2370 assert!(RpcModuleSelection::are_identical(
2371 None,
2372 Some(&RpcModuleSelection::Selection(Default::default())),
2373 ));
2374 assert!(RpcModuleSelection::are_identical(
2375 Some(&RpcModuleSelection::Selection(Default::default())),
2376 None,
2377 ));
2378 assert!(RpcModuleSelection::are_identical(None, None));
2379 }
2380
2381 #[test]
2382 fn test_rpc_module_str() {
2383 macro_rules! assert_rpc_module {
2384 ($($s:expr => $v:expr,)*) => {
2385 $(
2386 let val: RethRpcModule = $s.parse().unwrap();
2387 assert_eq!(val, $v);
2388 assert_eq!(val.to_string(), $s);
2389 )*
2390 };
2391 }
2392 assert_rpc_module!
2393 (
2394 "admin" => RethRpcModule::Admin,
2395 "debug" => RethRpcModule::Debug,
2396 "eth" => RethRpcModule::Eth,
2397 "net" => RethRpcModule::Net,
2398 "trace" => RethRpcModule::Trace,
2399 "web3" => RethRpcModule::Web3,
2400 "rpc" => RethRpcModule::Rpc,
2401 "ots" => RethRpcModule::Ots,
2402 "reth" => RethRpcModule::Reth,
2403 );
2404 }
2405
2406 #[test]
2407 fn test_default_selection() {
2408 let selection = RpcModuleSelection::Standard.to_selection();
2409 assert_eq!(selection, [RethRpcModule::Eth, RethRpcModule::Net, RethRpcModule::Web3].into())
2410 }
2411
2412 #[test]
2413 fn test_create_rpc_module_config() {
2414 let selection = vec!["eth", "admin"];
2415 let config = RpcModuleSelection::try_from_selection(selection).unwrap();
2416 assert_eq!(
2417 config,
2418 RpcModuleSelection::Selection([RethRpcModule::Eth, RethRpcModule::Admin].into())
2419 );
2420 }
2421
2422 #[test]
2423 fn test_configure_transport_config() {
2424 let config = TransportRpcModuleConfig::default()
2425 .with_http([RethRpcModule::Eth, RethRpcModule::Admin]);
2426 assert_eq!(
2427 config,
2428 TransportRpcModuleConfig {
2429 http: Some(RpcModuleSelection::Selection(
2430 [RethRpcModule::Eth, RethRpcModule::Admin].into()
2431 )),
2432 ws: None,
2433 ipc: None,
2434 config: None,
2435 }
2436 )
2437 }
2438
2439 #[test]
2440 fn test_configure_transport_config_none() {
2441 let config = TransportRpcModuleConfig::default().with_http(Vec::<RethRpcModule>::new());
2442 assert_eq!(
2443 config,
2444 TransportRpcModuleConfig {
2445 http: Some(RpcModuleSelection::Selection(Default::default())),
2446 ws: None,
2447 ipc: None,
2448 config: None,
2449 }
2450 )
2451 }
2452
2453 fn create_test_module() -> RpcModule<()> {
2454 let mut module = RpcModule::new(());
2455 module.register_method("anything", |_, _, _| "succeed").unwrap();
2456 module
2457 }
2458
2459 #[test]
2460 fn test_remove_http_method() {
2461 let mut modules =
2462 TransportRpcModules { http: Some(create_test_module()), ..Default::default() };
2463 assert!(modules.remove_http_method("anything"));
2465
2466 assert!(!modules.remove_http_method("non_existent_method"));
2468
2469 assert!(modules.http.as_ref().unwrap().method("anything").is_none());
2471 }
2472
2473 #[test]
2474 fn test_remove_ws_method() {
2475 let mut modules =
2476 TransportRpcModules { ws: Some(create_test_module()), ..Default::default() };
2477
2478 assert!(modules.remove_ws_method("anything"));
2480
2481 assert!(!modules.remove_ws_method("non_existent_method"));
2483
2484 assert!(modules.ws.as_ref().unwrap().method("anything").is_none());
2486 }
2487
2488 #[test]
2489 fn test_remove_ipc_method() {
2490 let mut modules =
2491 TransportRpcModules { ipc: Some(create_test_module()), ..Default::default() };
2492
2493 assert!(modules.remove_ipc_method("anything"));
2495
2496 assert!(!modules.remove_ipc_method("non_existent_method"));
2498
2499 assert!(modules.ipc.as_ref().unwrap().method("anything").is_none());
2501 }
2502
2503 #[test]
2504 fn test_remove_method_from_configured() {
2505 let mut modules = TransportRpcModules {
2506 http: Some(create_test_module()),
2507 ws: Some(create_test_module()),
2508 ipc: Some(create_test_module()),
2509 ..Default::default()
2510 };
2511
2512 assert!(modules.remove_method_from_configured("anything"));
2514
2515 assert!(!modules.remove_method_from_configured("anything"));
2517
2518 assert!(!modules.remove_method_from_configured("non_existent_method"));
2520
2521 assert!(modules.http.as_ref().unwrap().method("anything").is_none());
2523 assert!(modules.ws.as_ref().unwrap().method("anything").is_none());
2524 assert!(modules.ipc.as_ref().unwrap().method("anything").is_none());
2525 }
2526
2527 #[test]
2528 fn test_transport_rpc_module_rename() {
2529 let mut modules = TransportRpcModules {
2530 http: Some(create_test_module()),
2531 ws: Some(create_test_module()),
2532 ipc: Some(create_test_module()),
2533 ..Default::default()
2534 };
2535
2536 assert!(modules.http.as_ref().unwrap().method("anything").is_some());
2538 assert!(modules.ws.as_ref().unwrap().method("anything").is_some());
2539 assert!(modules.ipc.as_ref().unwrap().method("anything").is_some());
2540
2541 assert!(modules.http.as_ref().unwrap().method("something").is_none());
2543 assert!(modules.ws.as_ref().unwrap().method("something").is_none());
2544 assert!(modules.ipc.as_ref().unwrap().method("something").is_none());
2545
2546 let mut other_module = RpcModule::new(());
2548 other_module.register_method("something", |_, _, _| "fails").unwrap();
2549
2550 modules.rename("anything", other_module).expect("rename failed");
2552
2553 assert!(modules.http.as_ref().unwrap().method("anything").is_none());
2555 assert!(modules.ws.as_ref().unwrap().method("anything").is_none());
2556 assert!(modules.ipc.as_ref().unwrap().method("anything").is_none());
2557
2558 assert!(modules.http.as_ref().unwrap().method("something").is_some());
2560 assert!(modules.ws.as_ref().unwrap().method("something").is_some());
2561 assert!(modules.ipc.as_ref().unwrap().method("something").is_some());
2562 }
2563
2564 #[test]
2565 fn test_replace_http_method() {
2566 let mut modules =
2567 TransportRpcModules { http: Some(create_test_module()), ..Default::default() };
2568
2569 let mut other_module = RpcModule::new(());
2570 other_module.register_method("something", |_, _, _| "fails").unwrap();
2571
2572 assert!(modules.replace_http(other_module.clone()).unwrap());
2573
2574 assert!(modules.http.as_ref().unwrap().method("something").is_some());
2575
2576 other_module.register_method("anything", |_, _, _| "fails").unwrap();
2577 assert!(modules.replace_http(other_module.clone()).unwrap());
2578
2579 assert!(modules.http.as_ref().unwrap().method("anything").is_some());
2580 }
2581 #[test]
2582 fn test_replace_ipc_method() {
2583 let mut modules =
2584 TransportRpcModules { ipc: Some(create_test_module()), ..Default::default() };
2585
2586 let mut other_module = RpcModule::new(());
2587 other_module.register_method("something", |_, _, _| "fails").unwrap();
2588
2589 assert!(modules.replace_ipc(other_module.clone()).unwrap());
2590
2591 assert!(modules.ipc.as_ref().unwrap().method("something").is_some());
2592
2593 other_module.register_method("anything", |_, _, _| "fails").unwrap();
2594 assert!(modules.replace_ipc(other_module.clone()).unwrap());
2595
2596 assert!(modules.ipc.as_ref().unwrap().method("anything").is_some());
2597 }
2598 #[test]
2599 fn test_replace_ws_method() {
2600 let mut modules =
2601 TransportRpcModules { ws: Some(create_test_module()), ..Default::default() };
2602
2603 let mut other_module = RpcModule::new(());
2604 other_module.register_method("something", |_, _, _| "fails").unwrap();
2605
2606 assert!(modules.replace_ws(other_module.clone()).unwrap());
2607
2608 assert!(modules.ws.as_ref().unwrap().method("something").is_some());
2609
2610 other_module.register_method("anything", |_, _, _| "fails").unwrap();
2611 assert!(modules.replace_ws(other_module.clone()).unwrap());
2612
2613 assert!(modules.ws.as_ref().unwrap().method("anything").is_some());
2614 }
2615
2616 #[test]
2617 fn test_replace_configured() {
2618 let mut modules = TransportRpcModules {
2619 http: Some(create_test_module()),
2620 ws: Some(create_test_module()),
2621 ipc: Some(create_test_module()),
2622 ..Default::default()
2623 };
2624 let mut other_module = RpcModule::new(());
2625 other_module.register_method("something", |_, _, _| "fails").unwrap();
2626
2627 assert!(modules.replace_configured(other_module).unwrap());
2628
2629 assert!(modules.http.as_ref().unwrap().method("something").is_some());
2631 assert!(modules.ipc.as_ref().unwrap().method("something").is_some());
2632 assert!(modules.ws.as_ref().unwrap().method("something").is_some());
2633
2634 assert!(modules.http.as_ref().unwrap().method("anything").is_some());
2635 assert!(modules.ipc.as_ref().unwrap().method("anything").is_some());
2636 assert!(modules.ws.as_ref().unwrap().method("anything").is_some());
2637 }
2638
2639 #[test]
2640 fn test_add_or_replace_if_module_configured() {
2641 let config = TransportRpcModuleConfig::default()
2643 .with_http([RethRpcModule::Eth])
2644 .with_ws([RethRpcModule::Eth]);
2645
2646 let mut http_module = RpcModule::new(());
2648 http_module.register_method("eth_existing", |_, _, _| "original").unwrap();
2649
2650 let mut ws_module = RpcModule::new(());
2652 ws_module.register_method("eth_existing", |_, _, _| "original").unwrap();
2653
2654 let ipc_module = RpcModule::new(());
2656
2657 let mut modules = TransportRpcModules {
2659 config,
2660 http: Some(http_module),
2661 ws: Some(ws_module),
2662 ipc: Some(ipc_module),
2663 };
2664
2665 let mut new_module = RpcModule::new(());
2667 new_module.register_method("eth_existing", |_, _, _| "replaced").unwrap(); new_module.register_method("eth_new", |_, _, _| "added").unwrap(); let new_methods: Methods = new_module.into();
2670
2671 let result = modules.add_or_replace_if_module_configured(RethRpcModule::Eth, new_methods);
2673 assert!(result.is_ok(), "Function should succeed");
2674
2675 let http = modules.http.as_ref().unwrap();
2677 assert!(http.method("eth_existing").is_some());
2678 assert!(http.method("eth_new").is_some());
2679
2680 let ws = modules.ws.as_ref().unwrap();
2682 assert!(ws.method("eth_existing").is_some());
2683 assert!(ws.method("eth_new").is_some());
2684
2685 let ipc = modules.ipc.as_ref().unwrap();
2687 assert!(ipc.method("eth_existing").is_none());
2688 assert!(ipc.method("eth_new").is_none());
2689 }
2690
2691 #[test]
2692 fn test_merge_if_module_configured_with_lazy_evaluation() {
2693 let config = TransportRpcModuleConfig::default().with_http([RethRpcModule::Eth]);
2695
2696 let mut modules =
2697 TransportRpcModules { config, http: Some(RpcModule::new(())), ws: None, ipc: None };
2698
2699 let mut closure_called = false;
2701
2702 let result = modules.merge_if_module_configured_with(RethRpcModule::Eth, || {
2704 closure_called = true;
2705 let mut methods = RpcModule::new(());
2706 methods.register_method("eth_test", |_, _, _| "test").unwrap();
2707 methods.into()
2708 });
2709
2710 assert!(result.is_ok());
2711 assert!(closure_called, "Closure should be called when module is configured");
2712 assert!(modules.http.as_ref().unwrap().method("eth_test").is_some());
2713
2714 closure_called = false;
2716 let result = modules.merge_if_module_configured_with(RethRpcModule::Debug, || {
2717 closure_called = true;
2718 RpcModule::new(()).into()
2719 });
2720
2721 assert!(result.is_ok());
2722 assert!(!closure_called, "Closure should NOT be called when module is not configured");
2723 }
2724}