1#![doc(
15 html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
16 html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
17 issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
18)]
19#![cfg_attr(not(test), warn(unused_crate_dependencies))]
20#![cfg_attr(docsrs, feature(doc_cfg))]
21
22use crate::{auth::AuthRpcModule, error::WsHttpSamePortError, metrics::RpcRequestMetrics};
23use alloy_network::{Ethereum, IntoWallet};
24use alloy_provider::{fillers::RecommendedFillers, Provider, ProviderBuilder};
25use core::marker::PhantomData;
26use error::{ConflictingModules, RpcError, ServerKind};
27use http::{header::AUTHORIZATION, HeaderMap};
28use jsonrpsee::{
29 core::RegisterMethodError,
30 server::{middleware::rpc::RpcServiceBuilder, AlreadyStoppedError, IdProvider, ServerHandle},
31 Methods, RpcModule,
32};
33use reth_chainspec::{ChainSpecProvider, EthereumHardforks};
34use reth_consensus::{ConsensusError, FullConsensus};
35use reth_engine_primitives::ConsensusEngineEvent;
36use reth_evm::ConfigureEvm;
37use reth_network_api::{noop::NoopNetwork, NetworkInfo, Peers};
38use reth_primitives_traits::{NodePrimitives, TxTy};
39use reth_rpc::{
40 AdminApi, DebugApi, EngineEthApi, EthApi, EthApiBuilder, EthBundle, MinerApi, NetApi,
41 OtterscanApi, RPCApi, RethApi, TraceApi, TxPoolApi, Web3Api,
42};
43use reth_rpc_api::servers::*;
44use reth_rpc_eth_api::{
45 helpers::{
46 pending_block::PendingEnvBuilder, Call, EthApiSpec, EthTransactions, LoadPendingBlock,
47 TraceExt,
48 },
49 node::RpcNodeCoreAdapter,
50 EthApiServer, EthApiTypes, FullEthApiServer, FullEthApiTypes, RpcBlock, RpcConvert,
51 RpcConverter, RpcHeader, RpcNodeCore, RpcReceipt, RpcTransaction, RpcTxReq,
52};
53use reth_rpc_eth_types::{receipt::EthReceiptConverter, EthConfig, EthSubscriptionIdProvider};
54use reth_rpc_layer::{AuthLayer, Claims, CompressionLayer, JwtAuthValidator, JwtSecret};
55pub use reth_rpc_server_types::RethRpcModule;
56use reth_storage_api::{
57 AccountReader, BlockReader, ChangeSetReader, FullRpcProvider, NodePrimitivesProvider,
58 StateProviderFactory,
59};
60use reth_tasks::{pool::BlockingTaskGuard, TaskSpawner, TokioTaskExecutor};
61use reth_tokio_util::EventSender;
62use reth_transaction_pool::{noop::NoopTransactionPool, TransactionPool};
63use serde::{Deserialize, Serialize};
64use std::{
65 collections::HashMap,
66 fmt::Debug,
67 net::{Ipv4Addr, SocketAddr, SocketAddrV4},
68 time::{Duration, SystemTime, UNIX_EPOCH},
69};
70use tower_http::cors::CorsLayer;
71
72pub use cors::CorsDomainError;
73
74pub use jsonrpsee::server::ServerBuilder;
76use jsonrpsee::server::ServerConfigBuilder;
77pub use reth_ipc::server::{
78 Builder as IpcServerBuilder, RpcServiceBuilder as IpcRpcServiceBuilder,
79};
80pub use reth_rpc_server_types::{constants, RpcModuleSelection};
81pub use tower::layer::util::{Identity, Stack};
82
83pub mod auth;
85
86pub mod config;
88
89pub mod middleware;
91
92mod cors;
94
95pub mod error;
97
98pub mod eth;
100pub use eth::EthHandlers;
101
102mod metrics;
104use crate::middleware::RethRpcMiddleware;
105pub use metrics::{MeteredBatchRequestsFuture, MeteredRequestFuture, RpcRequestMetricsService};
106use reth_chain_state::CanonStateSubscriptions;
107use reth_rpc::eth::sim_bundle::EthSimBundle;
108
109pub mod rate_limiter;
111
112#[derive(Debug, Clone)]
116pub struct RpcModuleBuilder<N, Provider, Pool, Network, EvmConfig, Consensus> {
117 provider: Provider,
119 pool: Pool,
121 network: Network,
123 executor: Box<dyn TaskSpawner + 'static>,
125 evm_config: EvmConfig,
127 consensus: Consensus,
129 _primitives: PhantomData<N>,
131}
132
133impl<N, Provider, Pool, Network, EvmConfig, Consensus>
136 RpcModuleBuilder<N, Provider, Pool, Network, EvmConfig, Consensus>
137{
138 pub const fn new(
140 provider: Provider,
141 pool: Pool,
142 network: Network,
143 executor: Box<dyn TaskSpawner + 'static>,
144 evm_config: EvmConfig,
145 consensus: Consensus,
146 ) -> Self {
147 Self { provider, pool, network, executor, evm_config, consensus, _primitives: PhantomData }
148 }
149
150 pub fn with_provider<P>(
152 self,
153 provider: P,
154 ) -> RpcModuleBuilder<N, P, Pool, Network, EvmConfig, Consensus> {
155 let Self { pool, network, executor, evm_config, consensus, _primitives, .. } = self;
156 RpcModuleBuilder { provider, network, pool, executor, evm_config, consensus, _primitives }
157 }
158
159 pub fn with_pool<P>(
161 self,
162 pool: P,
163 ) -> RpcModuleBuilder<N, Provider, P, Network, EvmConfig, Consensus> {
164 let Self { provider, network, executor, evm_config, consensus, _primitives, .. } = self;
165 RpcModuleBuilder { provider, network, pool, executor, evm_config, consensus, _primitives }
166 }
167
168 pub fn with_noop_pool(
174 self,
175 ) -> RpcModuleBuilder<N, Provider, NoopTransactionPool, Network, EvmConfig, Consensus> {
176 let Self { provider, executor, network, evm_config, consensus, _primitives, .. } = self;
177 RpcModuleBuilder {
178 provider,
179 executor,
180 network,
181 evm_config,
182 pool: NoopTransactionPool::default(),
183 consensus,
184 _primitives,
185 }
186 }
187
188 pub fn with_network<Net>(
190 self,
191 network: Net,
192 ) -> RpcModuleBuilder<N, Provider, Pool, Net, EvmConfig, Consensus> {
193 let Self { provider, pool, executor, evm_config, consensus, _primitives, .. } = self;
194 RpcModuleBuilder { provider, network, pool, executor, evm_config, consensus, _primitives }
195 }
196
197 pub fn with_noop_network(
203 self,
204 ) -> RpcModuleBuilder<N, Provider, Pool, NoopNetwork, EvmConfig, Consensus> {
205 let Self { provider, pool, executor, evm_config, consensus, _primitives, .. } = self;
206 RpcModuleBuilder {
207 provider,
208 pool,
209 executor,
210 network: NoopNetwork::default(),
211 evm_config,
212 consensus,
213 _primitives,
214 }
215 }
216
217 pub fn with_executor(self, executor: Box<dyn TaskSpawner + 'static>) -> Self {
219 let Self { pool, network, provider, evm_config, consensus, _primitives, .. } = self;
220 Self { provider, network, pool, executor, evm_config, consensus, _primitives }
221 }
222
223 pub fn with_tokio_executor(self) -> Self {
228 let Self { pool, network, provider, evm_config, consensus, _primitives, .. } = self;
229 Self {
230 provider,
231 network,
232 pool,
233 executor: Box::new(TokioTaskExecutor::default()),
234 evm_config,
235 consensus,
236 _primitives,
237 }
238 }
239
240 pub fn with_evm_config<E>(
242 self,
243 evm_config: E,
244 ) -> RpcModuleBuilder<N, Provider, Pool, Network, E, Consensus> {
245 let Self { provider, pool, executor, network, consensus, _primitives, .. } = self;
246 RpcModuleBuilder { provider, network, pool, executor, evm_config, consensus, _primitives }
247 }
248
249 pub fn with_consensus<C>(
251 self,
252 consensus: C,
253 ) -> RpcModuleBuilder<N, Provider, Pool, Network, EvmConfig, C> {
254 let Self { provider, network, pool, executor, evm_config, _primitives, .. } = self;
255 RpcModuleBuilder { provider, network, pool, executor, evm_config, consensus, _primitives }
256 }
257
258 #[expect(clippy::type_complexity)]
260 pub fn eth_api_builder<ChainSpec>(
261 &self,
262 ) -> EthApiBuilder<
263 RpcNodeCoreAdapter<Provider, Pool, Network, EvmConfig>,
264 RpcConverter<Ethereum, EvmConfig, EthReceiptConverter<ChainSpec>>,
265 >
266 where
267 Provider: Clone,
268 Pool: Clone,
269 Network: Clone,
270 EvmConfig: Clone,
271 RpcNodeCoreAdapter<Provider, Pool, Network, EvmConfig>:
272 RpcNodeCore<Provider: ChainSpecProvider<ChainSpec = ChainSpec>, Evm = EvmConfig>,
273 {
274 EthApiBuilder::new(
275 self.provider.clone(),
276 self.pool.clone(),
277 self.network.clone(),
278 self.evm_config.clone(),
279 )
280 }
281
282 #[expect(clippy::type_complexity)]
288 pub fn bootstrap_eth_api<ChainSpec>(
289 &self,
290 ) -> EthApi<
291 RpcNodeCoreAdapter<Provider, Pool, Network, EvmConfig>,
292 RpcConverter<Ethereum, EvmConfig, EthReceiptConverter<ChainSpec>>,
293 >
294 where
295 Provider: Clone,
296 Pool: Clone,
297 Network: Clone,
298 EvmConfig: ConfigureEvm + Clone,
299 RpcNodeCoreAdapter<Provider, Pool, Network, EvmConfig>:
300 RpcNodeCore<Provider: ChainSpecProvider<ChainSpec = ChainSpec>, Evm = EvmConfig>,
301 RpcConverter<Ethereum, EvmConfig, EthReceiptConverter<ChainSpec>>: RpcConvert,
302 (): PendingEnvBuilder<EvmConfig>,
303 {
304 self.eth_api_builder().build()
305 }
306}
307
308impl<N, Provider, Pool, Network, EvmConfig, Consensus>
309 RpcModuleBuilder<N, Provider, Pool, Network, EvmConfig, Consensus>
310where
311 N: NodePrimitives,
312 Provider: FullRpcProvider<Block = N::Block, Receipt = N::Receipt, Header = N::BlockHeader>
313 + CanonStateSubscriptions<Primitives = N>
314 + AccountReader
315 + ChangeSetReader,
316 Pool: TransactionPool + Clone + 'static,
317 Network: NetworkInfo + Peers + Clone + 'static,
318 EvmConfig: ConfigureEvm<Primitives = N> + 'static,
319 Consensus: FullConsensus<N, Error = ConsensusError> + Clone + 'static,
320{
321 pub fn build_with_auth_server<EthApi>(
328 self,
329 module_config: TransportRpcModuleConfig,
330 engine: impl IntoEngineApiRpcModule,
331 eth: EthApi,
332 engine_events: EventSender<ConsensusEngineEvent<N>>,
333 ) -> (
334 TransportRpcModules,
335 AuthRpcModule,
336 RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>,
337 )
338 where
339 EthApi: FullEthApiServer<Provider = Provider, Pool = Pool>,
340 {
341 let config = module_config.config.clone().unwrap_or_default();
342
343 let mut registry = self.into_registry(config, eth, engine_events);
344 let modules = registry.create_transport_rpc_modules(module_config);
345 let auth_module = registry.create_auth_module(engine);
346
347 (modules, auth_module, registry)
348 }
349
350 pub fn into_registry<EthApi>(
355 self,
356 config: RpcModuleConfig,
357 eth: EthApi,
358 engine_events: EventSender<ConsensusEngineEvent<N>>,
359 ) -> RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
360 where
361 EthApi: FullEthApiServer<Provider = Provider, Pool = Pool>,
362 {
363 let Self { provider, pool, network, executor, consensus, evm_config, .. } = self;
364 RpcRegistryInner::new(
365 provider,
366 pool,
367 network,
368 executor,
369 consensus,
370 config,
371 evm_config,
372 eth,
373 engine_events,
374 )
375 }
376
377 pub fn build<EthApi>(
380 self,
381 module_config: TransportRpcModuleConfig,
382 eth: EthApi,
383 engine_events: EventSender<ConsensusEngineEvent<N>>,
384 ) -> TransportRpcModules<()>
385 where
386 EthApi: FullEthApiServer<Provider = Provider, Pool = Pool>,
387 {
388 let mut modules = TransportRpcModules::default();
389
390 if !module_config.is_empty() {
391 let TransportRpcModuleConfig { http, ws, ipc, config } = module_config.clone();
392
393 let mut registry = self.into_registry(config.unwrap_or_default(), eth, engine_events);
394
395 modules.config = module_config;
396 modules.http = registry.maybe_module(http.as_ref());
397 modules.ws = registry.maybe_module(ws.as_ref());
398 modules.ipc = registry.maybe_module(ipc.as_ref());
399 }
400
401 modules
402 }
403}
404
405impl<N: NodePrimitives> Default for RpcModuleBuilder<N, (), (), (), (), ()> {
406 fn default() -> Self {
407 Self::new((), (), (), Box::new(TokioTaskExecutor::default()), (), ())
408 }
409}
410
411#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize)]
413pub struct RpcModuleConfig {
414 eth: EthConfig,
416}
417
418impl RpcModuleConfig {
421 pub fn builder() -> RpcModuleConfigBuilder {
423 RpcModuleConfigBuilder::default()
424 }
425
426 pub const fn new(eth: EthConfig) -> Self {
428 Self { eth }
429 }
430
431 pub const fn eth(&self) -> &EthConfig {
433 &self.eth
434 }
435
436 pub const fn eth_mut(&mut self) -> &mut EthConfig {
438 &mut self.eth
439 }
440}
441
442#[derive(Clone, Debug, Default)]
444pub struct RpcModuleConfigBuilder {
445 eth: Option<EthConfig>,
446}
447
448impl RpcModuleConfigBuilder {
451 pub fn eth(mut self, eth: EthConfig) -> Self {
453 self.eth = Some(eth);
454 self
455 }
456
457 pub fn build(self) -> RpcModuleConfig {
459 let Self { eth } = self;
460 RpcModuleConfig { eth: eth.unwrap_or_default() }
461 }
462
463 pub const fn get_eth(&self) -> Option<&EthConfig> {
465 self.eth.as_ref()
466 }
467
468 pub const fn eth_mut(&mut self) -> &mut Option<EthConfig> {
470 &mut self.eth
471 }
472
473 pub fn eth_mut_or_default(&mut self) -> &mut EthConfig {
475 self.eth.get_or_insert_with(EthConfig::default)
476 }
477}
478
479#[derive(Debug)]
481pub struct RpcRegistryInner<Provider, Pool, Network, EthApi: EthApiTypes, EvmConfig, Consensus> {
482 provider: Provider,
483 pool: Pool,
484 network: Network,
485 executor: Box<dyn TaskSpawner + 'static>,
486 evm_config: EvmConfig,
487 consensus: Consensus,
488 eth: EthHandlers<EthApi>,
490 blocking_pool_guard: BlockingTaskGuard,
492 modules: HashMap<RethRpcModule, Methods>,
494 eth_config: EthConfig,
496 engine_events:
498 EventSender<ConsensusEngineEvent<<EthApi::RpcConvert as RpcConvert>::Primitives>>,
499}
500
501impl<N, Provider, Pool, Network, EthApi, EvmConfig, Consensus>
504 RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
505where
506 N: NodePrimitives,
507 Provider: StateProviderFactory
508 + CanonStateSubscriptions<Primitives = N>
509 + BlockReader<Block = N::Block, Receipt = N::Receipt>
510 + Clone
511 + Unpin
512 + 'static,
513 Pool: Send + Sync + Clone + 'static,
514 Network: Clone + 'static,
515 EthApi: FullEthApiTypes + 'static,
516 EvmConfig: ConfigureEvm<Primitives = N>,
517{
518 #[expect(clippy::too_many_arguments)]
520 pub fn new(
521 provider: Provider,
522 pool: Pool,
523 network: Network,
524 executor: Box<dyn TaskSpawner + 'static>,
525 consensus: Consensus,
526 config: RpcModuleConfig,
527 evm_config: EvmConfig,
528 eth_api: EthApi,
529 engine_events: EventSender<
530 ConsensusEngineEvent<<EthApi::Provider as NodePrimitivesProvider>::Primitives>,
531 >,
532 ) -> Self
533 where
534 EvmConfig: ConfigureEvm<Primitives = N>,
535 {
536 let blocking_pool_guard = BlockingTaskGuard::new(config.eth.max_tracing_requests);
537
538 let eth = EthHandlers::bootstrap(config.eth.clone(), executor.clone(), eth_api);
539
540 Self {
541 provider,
542 pool,
543 network,
544 eth,
545 executor,
546 consensus,
547 modules: Default::default(),
548 blocking_pool_guard,
549 eth_config: config.eth,
550 evm_config,
551 engine_events,
552 }
553 }
554}
555
556impl<Provider, Pool, Network, EthApi, Evm, Consensus>
557 RpcRegistryInner<Provider, Pool, Network, EthApi, Evm, Consensus>
558where
559 EthApi: EthApiTypes,
560{
561 pub const fn eth_api(&self) -> &EthApi {
563 &self.eth.api
564 }
565
566 pub const fn eth_handlers(&self) -> &EthHandlers<EthApi> {
568 &self.eth
569 }
570
571 pub const fn pool(&self) -> &Pool {
573 &self.pool
574 }
575
576 pub const fn tasks(&self) -> &(dyn TaskSpawner + 'static) {
578 &*self.executor
579 }
580
581 pub const fn provider(&self) -> &Provider {
583 &self.provider
584 }
585
586 pub const fn evm_config(&self) -> &Evm {
588 &self.evm_config
589 }
590
591 pub fn methods(&self) -> Vec<Methods> {
593 self.modules.values().cloned().collect()
594 }
595
596 pub fn module(&self) -> RpcModule<()> {
598 let mut module = RpcModule::new(());
599 for methods in self.modules.values().cloned() {
600 module.merge(methods).expect("No conflicts");
601 }
602 module
603 }
604}
605
606impl<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
607 RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
608where
609 Network: NetworkInfo + Clone + 'static,
610 EthApi: EthApiTypes,
611 Provider: BlockReader + ChainSpecProvider<ChainSpec: EthereumHardforks>,
612 EvmConfig: ConfigureEvm,
613{
614 pub fn admin_api(&self) -> AdminApi<Network, Provider::ChainSpec, Pool>
616 where
617 Network: Peers,
618 Pool: TransactionPool + Clone + 'static,
619 {
620 AdminApi::new(self.network.clone(), self.provider.chain_spec(), self.pool.clone())
621 }
622
623 pub fn web3_api(&self) -> Web3Api<Network> {
625 Web3Api::new(self.network.clone())
626 }
627
628 pub fn register_admin(&mut self) -> &mut Self
630 where
631 Network: Peers,
632 Pool: TransactionPool + Clone + 'static,
633 {
634 let adminapi = self.admin_api();
635 self.modules.insert(RethRpcModule::Admin, adminapi.into_rpc().into());
636 self
637 }
638
639 pub fn register_web3(&mut self) -> &mut Self {
641 let web3api = self.web3_api();
642 self.modules.insert(RethRpcModule::Web3, web3api.into_rpc().into());
643 self
644 }
645}
646
647impl<N, Provider, Pool, Network, EthApi, EvmConfig, Consensus>
648 RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
649where
650 N: NodePrimitives,
651 Provider: FullRpcProvider<
652 Header = N::BlockHeader,
653 Block = N::Block,
654 Receipt = N::Receipt,
655 Transaction = N::SignedTx,
656 > + AccountReader
657 + ChangeSetReader
658 + CanonStateSubscriptions,
659 Network: NetworkInfo + Peers + Clone + 'static,
660 EthApi: EthApiServer<
661 RpcTxReq<EthApi::NetworkTypes>,
662 RpcTransaction<EthApi::NetworkTypes>,
663 RpcBlock<EthApi::NetworkTypes>,
664 RpcReceipt<EthApi::NetworkTypes>,
665 RpcHeader<EthApi::NetworkTypes>,
666 TxTy<N>,
667 > + EthApiTypes,
668 EvmConfig: ConfigureEvm<Primitives = N> + 'static,
669{
670 pub fn register_eth(&mut self) -> &mut Self {
676 let eth_api = self.eth_api().clone();
677 self.modules.insert(RethRpcModule::Eth, eth_api.into_rpc().into());
678 self
679 }
680
681 pub fn register_ots(&mut self) -> &mut Self
687 where
688 EthApi: TraceExt + EthTransactions<Primitives = N>,
689 {
690 let otterscan_api = self.otterscan_api();
691 self.modules.insert(RethRpcModule::Ots, otterscan_api.into_rpc().into());
692 self
693 }
694
695 pub fn register_debug(&mut self) -> &mut Self
701 where
702 EthApi: EthTransactions + TraceExt,
703 {
704 let debug_api = self.debug_api();
705 self.modules.insert(RethRpcModule::Debug, debug_api.into_rpc().into());
706 self
707 }
708
709 pub fn register_trace(&mut self) -> &mut Self
715 where
716 EthApi: TraceExt,
717 {
718 let trace_api = self.trace_api();
719 self.modules.insert(RethRpcModule::Trace, trace_api.into_rpc().into());
720 self
721 }
722
723 pub fn register_net(&mut self) -> &mut Self
731 where
732 EthApi: EthApiSpec + 'static,
733 {
734 let netapi = self.net_api();
735 self.modules.insert(RethRpcModule::Net, netapi.into_rpc().into());
736 self
737 }
738
739 pub fn register_reth(&mut self) -> &mut Self {
747 let rethapi = self.reth_api();
748 self.modules.insert(RethRpcModule::Reth, rethapi.into_rpc().into());
749 self
750 }
751
752 pub fn otterscan_api(&self) -> OtterscanApi<EthApi> {
758 let eth_api = self.eth_api().clone();
759 OtterscanApi::new(eth_api)
760 }
761}
762
763impl<N, Provider, Pool, Network, EthApi, EvmConfig, Consensus>
764 RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
765where
766 N: NodePrimitives,
767 Provider: FullRpcProvider<
768 Block = N::Block,
769 Header = N::BlockHeader,
770 Transaction = N::SignedTx,
771 Receipt = N::Receipt,
772 > + AccountReader
773 + ChangeSetReader,
774 Network: NetworkInfo + Peers + Clone + 'static,
775 EthApi: EthApiTypes,
776 EvmConfig: ConfigureEvm<Primitives = N>,
777{
778 pub fn trace_api(&self) -> TraceApi<EthApi> {
784 TraceApi::new(
785 self.eth_api().clone(),
786 self.blocking_pool_guard.clone(),
787 self.eth_config.clone(),
788 )
789 }
790
791 pub fn bundle_api(&self) -> EthBundle<EthApi>
797 where
798 EthApi: EthTransactions + LoadPendingBlock + Call,
799 {
800 let eth_api = self.eth_api().clone();
801 EthBundle::new(eth_api, self.blocking_pool_guard.clone())
802 }
803
804 pub fn debug_api(&self) -> DebugApi<EthApi>
810 where
811 EthApi: FullEthApiTypes,
812 {
813 DebugApi::new(
814 self.eth_api().clone(),
815 self.blocking_pool_guard.clone(),
816 self.tasks(),
817 self.engine_events.new_listener(),
818 )
819 }
820
821 pub fn net_api(&self) -> NetApi<Network, EthApi>
827 where
828 EthApi: EthApiSpec + 'static,
829 {
830 let eth_api = self.eth_api().clone();
831 NetApi::new(self.network.clone(), eth_api)
832 }
833
834 pub fn reth_api(&self) -> RethApi<Provider> {
836 RethApi::new(self.provider.clone(), self.executor.clone())
837 }
838}
839
840impl<N, Provider, Pool, Network, EthApi, EvmConfig, Consensus>
841 RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
842where
843 N: NodePrimitives,
844 Provider: FullRpcProvider<Block = N::Block>
845 + CanonStateSubscriptions<Primitives = N>
846 + AccountReader
847 + ChangeSetReader,
848 Pool: TransactionPool + Clone + 'static,
849 Network: NetworkInfo + Peers + Clone + 'static,
850 EthApi: FullEthApiServer,
851 EvmConfig: ConfigureEvm<Primitives = N> + 'static,
852 Consensus: FullConsensus<N, Error = ConsensusError> + Clone + 'static,
853{
854 pub fn create_auth_module(&self, engine_api: impl IntoEngineApiRpcModule) -> AuthRpcModule {
860 let mut module = engine_api.into_rpc_module();
861
862 let eth_handlers = self.eth_handlers();
864 let engine_eth = EngineEthApi::new(eth_handlers.api.clone(), eth_handlers.filter.clone());
865
866 module.merge(engine_eth.into_rpc()).expect("No conflicting methods");
867
868 AuthRpcModule { inner: module }
869 }
870
871 fn maybe_module(&mut self, config: Option<&RpcModuleSelection>) -> Option<RpcModule<()>> {
873 config.map(|config| self.module_for(config))
874 }
875
876 pub fn create_transport_rpc_modules(
880 &mut self,
881 config: TransportRpcModuleConfig,
882 ) -> TransportRpcModules<()> {
883 let mut modules = TransportRpcModules::default();
884 let http = self.maybe_module(config.http.as_ref());
885 let ws = self.maybe_module(config.ws.as_ref());
886 let ipc = self.maybe_module(config.ipc.as_ref());
887
888 modules.config = config;
889 modules.http = http;
890 modules.ws = ws;
891 modules.ipc = ipc;
892 modules
893 }
894
895 pub fn module_for(&mut self, config: &RpcModuleSelection) -> RpcModule<()> {
898 let mut module = RpcModule::new(());
899 let all_methods = self.reth_methods(config.iter_selection());
900 for methods in all_methods {
901 module.merge(methods).expect("No conflicts");
902 }
903 module
904 }
905
906 pub fn reth_methods(
915 &mut self,
916 namespaces: impl Iterator<Item = RethRpcModule>,
917 ) -> Vec<Methods> {
918 let EthHandlers { api: eth_api, filter: eth_filter, pubsub: eth_pubsub, .. } =
919 self.eth_handlers().clone();
920
921 let namespaces: Vec<_> = namespaces.collect();
923 namespaces
924 .iter()
925 .map(|namespace| {
926 self.modules
927 .entry(namespace.clone())
928 .or_insert_with(|| match namespace.clone() {
929 RethRpcModule::Admin => AdminApi::new(
930 self.network.clone(),
931 self.provider.chain_spec(),
932 self.pool.clone(),
933 )
934 .into_rpc()
935 .into(),
936 RethRpcModule::Debug => DebugApi::new(
937 eth_api.clone(),
938 self.blocking_pool_guard.clone(),
939 &*self.executor,
940 self.engine_events.new_listener(),
941 )
942 .into_rpc()
943 .into(),
944 RethRpcModule::Eth => {
945 let mut module = eth_api.clone().into_rpc();
947 module.merge(eth_filter.clone().into_rpc()).expect("No conflicts");
948 module.merge(eth_pubsub.clone().into_rpc()).expect("No conflicts");
949 module
950 .merge(
951 EthBundle::new(
952 eth_api.clone(),
953 self.blocking_pool_guard.clone(),
954 )
955 .into_rpc(),
956 )
957 .expect("No conflicts");
958
959 module.into()
960 }
961 RethRpcModule::Net => {
962 NetApi::new(self.network.clone(), eth_api.clone()).into_rpc().into()
963 }
964 RethRpcModule::Trace => TraceApi::new(
965 eth_api.clone(),
966 self.blocking_pool_guard.clone(),
967 self.eth_config.clone(),
968 )
969 .into_rpc()
970 .into(),
971 RethRpcModule::Web3 => Web3Api::new(self.network.clone()).into_rpc().into(),
972 RethRpcModule::Txpool => TxPoolApi::new(
973 self.eth.api.pool().clone(),
974 dyn_clone::clone(self.eth.api.converter()),
975 )
976 .into_rpc()
977 .into(),
978 RethRpcModule::Rpc => RPCApi::new(
979 namespaces
980 .iter()
981 .map(|module| (module.to_string(), "1.0".to_string()))
982 .collect(),
983 )
984 .into_rpc()
985 .into(),
986 RethRpcModule::Ots => OtterscanApi::new(eth_api.clone()).into_rpc().into(),
987 RethRpcModule::Reth => {
988 RethApi::new(self.provider.clone(), self.executor.clone())
989 .into_rpc()
990 .into()
991 }
992 RethRpcModule::Miner => MinerApi::default().into_rpc().into(),
993 RethRpcModule::Mev => {
994 EthSimBundle::new(eth_api.clone(), self.blocking_pool_guard.clone())
995 .into_rpc()
996 .into()
997 }
998 RethRpcModule::Flashbots |
1002 RethRpcModule::Testing |
1003 RethRpcModule::Other(_) => Default::default(),
1004 })
1005 .clone()
1006 })
1007 .collect::<Vec<_>>()
1008 }
1009}
1010
1011impl<Provider, Pool, Network, EthApi, EvmConfig, Consensus> Clone
1012 for RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
1013where
1014 EthApi: EthApiTypes,
1015 Provider: Clone,
1016 Pool: Clone,
1017 Network: Clone,
1018 EvmConfig: Clone,
1019 Consensus: Clone,
1020{
1021 fn clone(&self) -> Self {
1022 Self {
1023 provider: self.provider.clone(),
1024 pool: self.pool.clone(),
1025 network: self.network.clone(),
1026 executor: self.executor.clone(),
1027 evm_config: self.evm_config.clone(),
1028 consensus: self.consensus.clone(),
1029 eth: self.eth.clone(),
1030 blocking_pool_guard: self.blocking_pool_guard.clone(),
1031 modules: self.modules.clone(),
1032 eth_config: self.eth_config.clone(),
1033 engine_events: self.engine_events.clone(),
1034 }
1035 }
1036}
1037
1038#[derive(Debug)]
1050pub struct RpcServerConfig<RpcMiddleware = Identity> {
1051 http_server_config: Option<ServerConfigBuilder>,
1053 http_cors_domains: Option<String>,
1055 http_addr: Option<SocketAddr>,
1057 http_disable_compression: bool,
1059 ws_server_config: Option<ServerConfigBuilder>,
1061 ws_cors_domains: Option<String>,
1063 ws_addr: Option<SocketAddr>,
1065 ipc_server_config: Option<IpcServerBuilder<Identity, Identity>>,
1067 ipc_endpoint: Option<String>,
1069 jwt_secret: Option<JwtSecret>,
1071 rpc_middleware: RpcMiddleware,
1073}
1074
1075impl Default for RpcServerConfig<Identity> {
1078 fn default() -> Self {
1080 Self {
1081 http_server_config: None,
1082 http_cors_domains: None,
1083 http_addr: None,
1084 http_disable_compression: false,
1085 ws_server_config: None,
1086 ws_cors_domains: None,
1087 ws_addr: None,
1088 ipc_server_config: None,
1089 ipc_endpoint: None,
1090 jwt_secret: None,
1091 rpc_middleware: Default::default(),
1092 }
1093 }
1094}
1095
1096impl RpcServerConfig {
1097 pub fn http(config: ServerConfigBuilder) -> Self {
1099 Self::default().with_http(config)
1100 }
1101
1102 pub fn ws(config: ServerConfigBuilder) -> Self {
1104 Self::default().with_ws(config)
1105 }
1106
1107 pub fn ipc(config: IpcServerBuilder<Identity, Identity>) -> Self {
1109 Self::default().with_ipc(config)
1110 }
1111
1112 pub fn with_http(mut self, config: ServerConfigBuilder) -> Self {
1117 self.http_server_config =
1118 Some(config.set_id_provider(EthSubscriptionIdProvider::default()));
1119 self
1120 }
1121
1122 pub fn with_ws(mut self, config: ServerConfigBuilder) -> Self {
1127 self.ws_server_config = Some(config.set_id_provider(EthSubscriptionIdProvider::default()));
1128 self
1129 }
1130
1131 pub fn with_ipc(mut self, config: IpcServerBuilder<Identity, Identity>) -> Self {
1136 self.ipc_server_config = Some(config.set_id_provider(EthSubscriptionIdProvider::default()));
1137 self
1138 }
1139}
1140
1141impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
1142 pub fn set_rpc_middleware<T>(self, rpc_middleware: T) -> RpcServerConfig<T> {
1144 RpcServerConfig {
1145 http_server_config: self.http_server_config,
1146 http_cors_domains: self.http_cors_domains,
1147 http_addr: self.http_addr,
1148 http_disable_compression: self.http_disable_compression,
1149 ws_server_config: self.ws_server_config,
1150 ws_cors_domains: self.ws_cors_domains,
1151 ws_addr: self.ws_addr,
1152 ipc_server_config: self.ipc_server_config,
1153 ipc_endpoint: self.ipc_endpoint,
1154 jwt_secret: self.jwt_secret,
1155 rpc_middleware,
1156 }
1157 }
1158
1159 pub fn with_cors(self, cors_domain: Option<String>) -> Self {
1161 self.with_http_cors(cors_domain.clone()).with_ws_cors(cors_domain)
1162 }
1163
1164 pub fn with_ws_cors(mut self, cors_domain: Option<String>) -> Self {
1166 self.ws_cors_domains = cors_domain;
1167 self
1168 }
1169
1170 pub const fn with_http_disable_compression(mut self, http_disable_compression: bool) -> Self {
1172 self.http_disable_compression = http_disable_compression;
1173 self
1174 }
1175
1176 pub fn with_http_cors(mut self, cors_domain: Option<String>) -> Self {
1178 self.http_cors_domains = cors_domain;
1179 self
1180 }
1181
1182 pub const fn with_http_address(mut self, addr: SocketAddr) -> Self {
1187 self.http_addr = Some(addr);
1188 self
1189 }
1190
1191 pub const fn with_ws_address(mut self, addr: SocketAddr) -> Self {
1196 self.ws_addr = Some(addr);
1197 self
1198 }
1199
1200 pub fn with_id_provider<I>(mut self, id_provider: I) -> Self
1204 where
1205 I: IdProvider + Clone + 'static,
1206 {
1207 if let Some(config) = self.http_server_config {
1208 self.http_server_config = Some(config.set_id_provider(id_provider.clone()));
1209 }
1210 if let Some(config) = self.ws_server_config {
1211 self.ws_server_config = Some(config.set_id_provider(id_provider.clone()));
1212 }
1213 if let Some(ipc) = self.ipc_server_config {
1214 self.ipc_server_config = Some(ipc.set_id_provider(id_provider));
1215 }
1216
1217 self
1218 }
1219
1220 pub fn with_ipc_endpoint(mut self, path: impl Into<String>) -> Self {
1224 self.ipc_endpoint = Some(path.into());
1225 self
1226 }
1227
1228 pub const fn with_jwt_secret(mut self, secret: Option<JwtSecret>) -> Self {
1230 self.jwt_secret = secret;
1231 self
1232 }
1233
1234 pub fn with_tokio_runtime(mut self, tokio_runtime: Option<tokio::runtime::Handle>) -> Self {
1236 let Some(tokio_runtime) = tokio_runtime else { return self };
1237 if let Some(http_server_config) = self.http_server_config {
1238 self.http_server_config =
1239 Some(http_server_config.custom_tokio_runtime(tokio_runtime.clone()));
1240 }
1241 if let Some(ws_server_config) = self.ws_server_config {
1242 self.ws_server_config =
1243 Some(ws_server_config.custom_tokio_runtime(tokio_runtime.clone()));
1244 }
1245 if let Some(ipc_server_config) = self.ipc_server_config {
1246 self.ipc_server_config = Some(ipc_server_config.custom_tokio_runtime(tokio_runtime));
1247 }
1248 self
1249 }
1250
1251 pub const fn has_server(&self) -> bool {
1255 self.http_server_config.is_some() ||
1256 self.ws_server_config.is_some() ||
1257 self.ipc_server_config.is_some()
1258 }
1259
1260 pub const fn http_address(&self) -> Option<SocketAddr> {
1262 self.http_addr
1263 }
1264
1265 pub const fn ws_address(&self) -> Option<SocketAddr> {
1267 self.ws_addr
1268 }
1269
1270 pub fn ipc_endpoint(&self) -> Option<String> {
1272 self.ipc_endpoint.clone()
1273 }
1274
1275 fn maybe_cors_layer(cors: Option<String>) -> Result<Option<CorsLayer>, CorsDomainError> {
1277 cors.as_deref().map(cors::create_cors_layer).transpose()
1278 }
1279
1280 fn maybe_jwt_layer(jwt_secret: Option<JwtSecret>) -> Option<AuthLayer<JwtAuthValidator>> {
1282 jwt_secret.map(|secret| AuthLayer::new(JwtAuthValidator::new(secret)))
1283 }
1284
1285 fn maybe_compression_layer(disable_compression: bool) -> Option<CompressionLayer> {
1288 if disable_compression {
1289 None
1290 } else {
1291 Some(CompressionLayer::new())
1292 }
1293 }
1294
1295 pub async fn start(self, modules: &TransportRpcModules) -> Result<RpcServerHandle, RpcError>
1301 where
1302 RpcMiddleware: RethRpcMiddleware,
1303 {
1304 let mut http_handle = None;
1305 let mut ws_handle = None;
1306 let mut ipc_handle = None;
1307
1308 let http_socket_addr = self.http_addr.unwrap_or(SocketAddr::V4(SocketAddrV4::new(
1309 Ipv4Addr::LOCALHOST,
1310 constants::DEFAULT_HTTP_RPC_PORT,
1311 )));
1312
1313 let ws_socket_addr = self.ws_addr.unwrap_or(SocketAddr::V4(SocketAddrV4::new(
1314 Ipv4Addr::LOCALHOST,
1315 constants::DEFAULT_WS_RPC_PORT,
1316 )));
1317
1318 let metrics = modules.ipc.as_ref().map(RpcRequestMetrics::ipc).unwrap_or_default();
1319 let ipc_path =
1320 self.ipc_endpoint.clone().unwrap_or_else(|| constants::DEFAULT_IPC_ENDPOINT.into());
1321
1322 if let Some(builder) = self.ipc_server_config {
1323 let ipc = builder
1324 .set_rpc_middleware(IpcRpcServiceBuilder::new().layer(metrics))
1325 .build(ipc_path);
1326 ipc_handle = Some(ipc.start(modules.ipc.clone().expect("ipc server error")).await?);
1327 }
1328
1329 if self.http_addr == self.ws_addr &&
1331 self.http_server_config.is_some() &&
1332 self.ws_server_config.is_some()
1333 {
1334 let cors = match (self.ws_cors_domains.as_ref(), self.http_cors_domains.as_ref()) {
1335 (Some(ws_cors), Some(http_cors)) => {
1336 if ws_cors.trim() != http_cors.trim() {
1337 return Err(WsHttpSamePortError::ConflictingCorsDomains {
1338 http_cors_domains: Some(http_cors.clone()),
1339 ws_cors_domains: Some(ws_cors.clone()),
1340 }
1341 .into());
1342 }
1343 Some(ws_cors)
1344 }
1345 (a, b) => a.or(b),
1346 }
1347 .cloned();
1348
1349 modules.config.ensure_ws_http_identical()?;
1351
1352 if let Some(config) = self.http_server_config {
1353 let server = ServerBuilder::new()
1354 .set_http_middleware(
1355 tower::ServiceBuilder::new()
1356 .option_layer(Self::maybe_cors_layer(cors)?)
1357 .option_layer(Self::maybe_jwt_layer(self.jwt_secret))
1358 .option_layer(Self::maybe_compression_layer(
1359 self.http_disable_compression,
1360 )),
1361 )
1362 .set_rpc_middleware(
1363 RpcServiceBuilder::default()
1364 .layer(
1365 modules
1366 .http
1367 .as_ref()
1368 .or(modules.ws.as_ref())
1369 .map(RpcRequestMetrics::same_port)
1370 .unwrap_or_default(),
1371 )
1372 .layer(self.rpc_middleware.clone()),
1373 )
1374 .set_config(config.build())
1375 .build(http_socket_addr)
1376 .await
1377 .map_err(|err| {
1378 RpcError::server_error(err, ServerKind::WsHttp(http_socket_addr))
1379 })?;
1380 let addr = server.local_addr().map_err(|err| {
1381 RpcError::server_error(err, ServerKind::WsHttp(http_socket_addr))
1382 })?;
1383 if let Some(module) = modules.http.as_ref().or(modules.ws.as_ref()) {
1384 let handle = server.start(module.clone());
1385 http_handle = Some(handle.clone());
1386 ws_handle = Some(handle);
1387 }
1388 return Ok(RpcServerHandle {
1389 http_local_addr: Some(addr),
1390 ws_local_addr: Some(addr),
1391 http: http_handle,
1392 ws: ws_handle,
1393 ipc_endpoint: self.ipc_endpoint.clone(),
1394 ipc: ipc_handle,
1395 jwt_secret: self.jwt_secret,
1396 });
1397 }
1398 }
1399
1400 let mut ws_local_addr = None;
1401 let mut ws_server = None;
1402 let mut http_local_addr = None;
1403 let mut http_server = None;
1404
1405 if let Some(config) = self.ws_server_config {
1406 let server = ServerBuilder::new()
1407 .set_config(config.ws_only().build())
1408 .set_http_middleware(
1409 tower::ServiceBuilder::new()
1410 .option_layer(Self::maybe_cors_layer(self.ws_cors_domains.clone())?)
1411 .option_layer(Self::maybe_jwt_layer(self.jwt_secret)),
1412 )
1413 .set_rpc_middleware(
1414 RpcServiceBuilder::default()
1415 .layer(modules.ws.as_ref().map(RpcRequestMetrics::ws).unwrap_or_default())
1416 .layer(self.rpc_middleware.clone()),
1417 )
1418 .build(ws_socket_addr)
1419 .await
1420 .map_err(|err| RpcError::server_error(err, ServerKind::WS(ws_socket_addr)))?;
1421
1422 let addr = server
1423 .local_addr()
1424 .map_err(|err| RpcError::server_error(err, ServerKind::WS(ws_socket_addr)))?;
1425
1426 ws_local_addr = Some(addr);
1427 ws_server = Some(server);
1428 }
1429
1430 if let Some(config) = self.http_server_config {
1431 let server = ServerBuilder::new()
1432 .set_config(config.http_only().build())
1433 .set_http_middleware(
1434 tower::ServiceBuilder::new()
1435 .option_layer(Self::maybe_cors_layer(self.http_cors_domains.clone())?)
1436 .option_layer(Self::maybe_jwt_layer(self.jwt_secret))
1437 .option_layer(Self::maybe_compression_layer(self.http_disable_compression)),
1438 )
1439 .set_rpc_middleware(
1440 RpcServiceBuilder::default()
1441 .layer(
1442 modules.http.as_ref().map(RpcRequestMetrics::http).unwrap_or_default(),
1443 )
1444 .layer(self.rpc_middleware.clone()),
1445 )
1446 .build(http_socket_addr)
1447 .await
1448 .map_err(|err| RpcError::server_error(err, ServerKind::Http(http_socket_addr)))?;
1449 let local_addr = server
1450 .local_addr()
1451 .map_err(|err| RpcError::server_error(err, ServerKind::Http(http_socket_addr)))?;
1452 http_local_addr = Some(local_addr);
1453 http_server = Some(server);
1454 }
1455
1456 http_handle = http_server
1457 .map(|http_server| http_server.start(modules.http.clone().expect("http server error")));
1458 ws_handle = ws_server
1459 .map(|ws_server| ws_server.start(modules.ws.clone().expect("ws server error")));
1460 Ok(RpcServerHandle {
1461 http_local_addr,
1462 ws_local_addr,
1463 http: http_handle,
1464 ws: ws_handle,
1465 ipc_endpoint: self.ipc_endpoint.clone(),
1466 ipc: ipc_handle,
1467 jwt_secret: self.jwt_secret,
1468 })
1469 }
1470}
1471
1472#[derive(Debug, Clone, Default, Eq, PartialEq)]
1484pub struct TransportRpcModuleConfig {
1485 http: Option<RpcModuleSelection>,
1487 ws: Option<RpcModuleSelection>,
1489 ipc: Option<RpcModuleSelection>,
1491 config: Option<RpcModuleConfig>,
1493}
1494
1495impl TransportRpcModuleConfig {
1498 pub fn set_http(http: impl Into<RpcModuleSelection>) -> Self {
1500 Self::default().with_http(http)
1501 }
1502
1503 pub fn set_ws(ws: impl Into<RpcModuleSelection>) -> Self {
1505 Self::default().with_ws(ws)
1506 }
1507
1508 pub fn set_ipc(ipc: impl Into<RpcModuleSelection>) -> Self {
1510 Self::default().with_ipc(ipc)
1511 }
1512
1513 pub fn with_http(mut self, http: impl Into<RpcModuleSelection>) -> Self {
1515 self.http = Some(http.into());
1516 self
1517 }
1518
1519 pub fn with_ws(mut self, ws: impl Into<RpcModuleSelection>) -> Self {
1521 self.ws = Some(ws.into());
1522 self
1523 }
1524
1525 pub fn with_ipc(mut self, ipc: impl Into<RpcModuleSelection>) -> Self {
1527 self.ipc = Some(ipc.into());
1528 self
1529 }
1530
1531 pub fn with_config(mut self, config: RpcModuleConfig) -> Self {
1533 self.config = Some(config);
1534 self
1535 }
1536
1537 pub const fn http_mut(&mut self) -> &mut Option<RpcModuleSelection> {
1539 &mut self.http
1540 }
1541
1542 pub const fn ws_mut(&mut self) -> &mut Option<RpcModuleSelection> {
1544 &mut self.ws
1545 }
1546
1547 pub const fn ipc_mut(&mut self) -> &mut Option<RpcModuleSelection> {
1549 &mut self.ipc
1550 }
1551
1552 pub const fn config_mut(&mut self) -> &mut Option<RpcModuleConfig> {
1554 &mut self.config
1555 }
1556
1557 pub const fn is_empty(&self) -> bool {
1559 self.http.is_none() && self.ws.is_none() && self.ipc.is_none()
1560 }
1561
1562 pub const fn http(&self) -> Option<&RpcModuleSelection> {
1564 self.http.as_ref()
1565 }
1566
1567 pub const fn ws(&self) -> Option<&RpcModuleSelection> {
1569 self.ws.as_ref()
1570 }
1571
1572 pub const fn ipc(&self) -> Option<&RpcModuleSelection> {
1574 self.ipc.as_ref()
1575 }
1576
1577 pub const fn config(&self) -> Option<&RpcModuleConfig> {
1579 self.config.as_ref()
1580 }
1581
1582 pub fn contains_any(&self, module: &RethRpcModule) -> bool {
1584 self.contains_http(module) || self.contains_ws(module) || self.contains_ipc(module)
1585 }
1586
1587 pub fn contains_http(&self, module: &RethRpcModule) -> bool {
1589 self.http.as_ref().is_some_and(|http| http.contains(module))
1590 }
1591
1592 pub fn contains_ws(&self, module: &RethRpcModule) -> bool {
1594 self.ws.as_ref().is_some_and(|ws| ws.contains(module))
1595 }
1596
1597 pub fn contains_ipc(&self, module: &RethRpcModule) -> bool {
1599 self.ipc.as_ref().is_some_and(|ipc| ipc.contains(module))
1600 }
1601
1602 fn ensure_ws_http_identical(&self) -> Result<(), WsHttpSamePortError> {
1605 if RpcModuleSelection::are_identical(self.http.as_ref(), self.ws.as_ref()) {
1606 Ok(())
1607 } else {
1608 let http_modules =
1609 self.http.as_ref().map(RpcModuleSelection::to_selection).unwrap_or_default();
1610 let ws_modules =
1611 self.ws.as_ref().map(RpcModuleSelection::to_selection).unwrap_or_default();
1612
1613 let http_not_ws = http_modules.difference(&ws_modules).cloned().collect();
1614 let ws_not_http = ws_modules.difference(&http_modules).cloned().collect();
1615 let overlap = http_modules.intersection(&ws_modules).cloned().collect();
1616
1617 Err(WsHttpSamePortError::ConflictingModules(Box::new(ConflictingModules {
1618 overlap,
1619 http_not_ws,
1620 ws_not_http,
1621 })))
1622 }
1623 }
1624}
1625
1626#[derive(Debug, Clone, Default)]
1628pub struct TransportRpcModules<Context = ()> {
1629 config: TransportRpcModuleConfig,
1631 http: Option<RpcModule<Context>>,
1633 ws: Option<RpcModule<Context>>,
1635 ipc: Option<RpcModule<Context>>,
1637}
1638
1639impl TransportRpcModules {
1642 pub fn with_config(mut self, config: TransportRpcModuleConfig) -> Self {
1645 self.config = config;
1646 self
1647 }
1648
1649 pub fn with_http(mut self, http: RpcModule<()>) -> Self {
1652 self.http = Some(http);
1653 self
1654 }
1655
1656 pub fn with_ws(mut self, ws: RpcModule<()>) -> Self {
1659 self.ws = Some(ws);
1660 self
1661 }
1662
1663 pub fn with_ipc(mut self, ipc: RpcModule<()>) -> Self {
1666 self.ipc = Some(ipc);
1667 self
1668 }
1669
1670 pub const fn module_config(&self) -> &TransportRpcModuleConfig {
1672 &self.config
1673 }
1674
1675 pub fn merge_if_module_configured(
1680 &mut self,
1681 module: RethRpcModule,
1682 other: impl Into<Methods>,
1683 ) -> Result<(), RegisterMethodError> {
1684 let other = other.into();
1685 if self.module_config().contains_http(&module) {
1686 self.merge_http(other.clone())?;
1687 }
1688 if self.module_config().contains_ws(&module) {
1689 self.merge_ws(other.clone())?;
1690 }
1691 if self.module_config().contains_ipc(&module) {
1692 self.merge_ipc(other)?;
1693 }
1694
1695 Ok(())
1696 }
1697
1698 pub fn merge_if_module_configured_with<F>(
1705 &mut self,
1706 module: RethRpcModule,
1707 f: F,
1708 ) -> Result<(), RegisterMethodError>
1709 where
1710 F: FnOnce() -> Methods,
1711 {
1712 if !self.module_config().contains_any(&module) {
1714 return Ok(());
1715 }
1716 self.merge_if_module_configured(module, f())
1717 }
1718
1719 pub fn merge_http(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
1725 if let Some(ref mut http) = self.http {
1726 return http.merge(other.into()).map(|_| true)
1727 }
1728 Ok(false)
1729 }
1730
1731 pub fn merge_ws(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
1737 if let Some(ref mut ws) = self.ws {
1738 return ws.merge(other.into()).map(|_| true)
1739 }
1740 Ok(false)
1741 }
1742
1743 pub fn merge_ipc(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
1749 if let Some(ref mut ipc) = self.ipc {
1750 return ipc.merge(other.into()).map(|_| true)
1751 }
1752 Ok(false)
1753 }
1754
1755 pub fn merge_configured(
1759 &mut self,
1760 other: impl Into<Methods>,
1761 ) -> Result<(), RegisterMethodError> {
1762 let other = other.into();
1763 self.merge_http(other.clone())?;
1764 self.merge_ws(other.clone())?;
1765 self.merge_ipc(other)?;
1766 Ok(())
1767 }
1768
1769 pub fn methods_by_module(&self, module: RethRpcModule) -> Methods {
1773 self.methods_by(|name| name.starts_with(module.as_str()))
1774 }
1775
1776 pub fn methods_by<F>(&self, mut filter: F) -> Methods
1780 where
1781 F: FnMut(&str) -> bool,
1782 {
1783 let mut methods = Methods::new();
1784
1785 let mut f =
1787 |name: &str, mm: &Methods| filter(name) && !mm.method_names().any(|m| m == name);
1788
1789 if let Some(m) = self.http_methods(|name| f(name, &methods)) {
1790 let _ = methods.merge(m);
1791 }
1792 if let Some(m) = self.ws_methods(|name| f(name, &methods)) {
1793 let _ = methods.merge(m);
1794 }
1795 if let Some(m) = self.ipc_methods(|name| f(name, &methods)) {
1796 let _ = methods.merge(m);
1797 }
1798 methods
1799 }
1800
1801 pub fn http_methods<F>(&self, filter: F) -> Option<Methods>
1805 where
1806 F: FnMut(&str) -> bool,
1807 {
1808 self.http.as_ref().map(|module| methods_by(module, filter))
1809 }
1810
1811 pub fn ws_methods<F>(&self, filter: F) -> Option<Methods>
1815 where
1816 F: FnMut(&str) -> bool,
1817 {
1818 self.ws.as_ref().map(|module| methods_by(module, filter))
1819 }
1820
1821 pub fn ipc_methods<F>(&self, filter: F) -> Option<Methods>
1825 where
1826 F: FnMut(&str) -> bool,
1827 {
1828 self.ipc.as_ref().map(|module| methods_by(module, filter))
1829 }
1830
1831 pub fn remove_http_method(&mut self, method_name: &'static str) -> bool {
1839 if let Some(http_module) = &mut self.http {
1840 http_module.remove_method(method_name).is_some()
1841 } else {
1842 false
1843 }
1844 }
1845
1846 pub fn remove_http_methods(&mut self, methods: impl IntoIterator<Item = &'static str>) {
1848 for name in methods {
1849 self.remove_http_method(name);
1850 }
1851 }
1852
1853 pub fn remove_ws_method(&mut self, method_name: &'static str) -> bool {
1861 if let Some(ws_module) = &mut self.ws {
1862 ws_module.remove_method(method_name).is_some()
1863 } else {
1864 false
1865 }
1866 }
1867
1868 pub fn remove_ws_methods(&mut self, methods: impl IntoIterator<Item = &'static str>) {
1870 for name in methods {
1871 self.remove_ws_method(name);
1872 }
1873 }
1874
1875 pub fn remove_ipc_method(&mut self, method_name: &'static str) -> bool {
1883 if let Some(ipc_module) = &mut self.ipc {
1884 ipc_module.remove_method(method_name).is_some()
1885 } else {
1886 false
1887 }
1888 }
1889
1890 pub fn remove_ipc_methods(&mut self, methods: impl IntoIterator<Item = &'static str>) {
1892 for name in methods {
1893 self.remove_ipc_method(name);
1894 }
1895 }
1896
1897 pub fn remove_method_from_configured(&mut self, method_name: &'static str) -> bool {
1901 let http_removed = self.remove_http_method(method_name);
1902 let ws_removed = self.remove_ws_method(method_name);
1903 let ipc_removed = self.remove_ipc_method(method_name);
1904
1905 http_removed || ws_removed || ipc_removed
1906 }
1907
1908 pub fn rename(
1912 &mut self,
1913 old_name: &'static str,
1914 new_method: impl Into<Methods>,
1915 ) -> Result<(), RegisterMethodError> {
1916 self.remove_method_from_configured(old_name);
1918
1919 self.merge_configured(new_method)
1921 }
1922
1923 pub fn replace_http(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
1930 let other = other.into();
1931 self.remove_http_methods(other.method_names());
1932 self.merge_http(other)
1933 }
1934
1935 pub fn replace_ipc(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
1942 let other = other.into();
1943 self.remove_ipc_methods(other.method_names());
1944 self.merge_ipc(other)
1945 }
1946
1947 pub fn replace_ws(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
1954 let other = other.into();
1955 self.remove_ws_methods(other.method_names());
1956 self.merge_ws(other)
1957 }
1958
1959 pub fn replace_configured(
1963 &mut self,
1964 other: impl Into<Methods>,
1965 ) -> Result<bool, RegisterMethodError> {
1966 let other = other.into();
1967 self.replace_http(other.clone())?;
1968 self.replace_ws(other.clone())?;
1969 self.replace_ipc(other)?;
1970 Ok(true)
1971 }
1972
1973 pub fn add_or_replace_http(
1977 &mut self,
1978 other: impl Into<Methods>,
1979 ) -> Result<bool, RegisterMethodError> {
1980 let other = other.into();
1981 self.remove_http_methods(other.method_names());
1982 self.merge_http(other)
1983 }
1984
1985 pub fn add_or_replace_ws(
1989 &mut self,
1990 other: impl Into<Methods>,
1991 ) -> Result<bool, RegisterMethodError> {
1992 let other = other.into();
1993 self.remove_ws_methods(other.method_names());
1994 self.merge_ws(other)
1995 }
1996
1997 pub fn add_or_replace_ipc(
2001 &mut self,
2002 other: impl Into<Methods>,
2003 ) -> Result<bool, RegisterMethodError> {
2004 let other = other.into();
2005 self.remove_ipc_methods(other.method_names());
2006 self.merge_ipc(other)
2007 }
2008
2009 pub fn add_or_replace_configured(
2011 &mut self,
2012 other: impl Into<Methods>,
2013 ) -> Result<(), RegisterMethodError> {
2014 let other = other.into();
2015 self.add_or_replace_http(other.clone())?;
2016 self.add_or_replace_ws(other.clone())?;
2017 self.add_or_replace_ipc(other)?;
2018 Ok(())
2019 }
2020 pub fn add_or_replace_if_module_configured(
2023 &mut self,
2024 module: RethRpcModule,
2025 other: impl Into<Methods>,
2026 ) -> Result<(), RegisterMethodError> {
2027 let other = other.into();
2028 if self.module_config().contains_http(&module) {
2029 self.add_or_replace_http(other.clone())?;
2030 }
2031 if self.module_config().contains_ws(&module) {
2032 self.add_or_replace_ws(other.clone())?;
2033 }
2034 if self.module_config().contains_ipc(&module) {
2035 self.add_or_replace_ipc(other)?;
2036 }
2037 Ok(())
2038 }
2039}
2040
2041fn methods_by<T, F>(module: &RpcModule<T>, mut filter: F) -> Methods
2043where
2044 F: FnMut(&str) -> bool,
2045{
2046 let mut methods = Methods::new();
2047 let method_names = module.method_names().filter(|name| filter(name));
2048
2049 for name in method_names {
2050 if let Some(matched_method) = module.method(name).cloned() {
2051 let _ = methods.verify_and_insert(name, matched_method);
2052 }
2053 }
2054
2055 methods
2056}
2057
2058#[derive(Clone, Debug)]
2063#[must_use = "Server stops if dropped"]
2064pub struct RpcServerHandle {
2065 http_local_addr: Option<SocketAddr>,
2067 ws_local_addr: Option<SocketAddr>,
2068 http: Option<ServerHandle>,
2069 ws: Option<ServerHandle>,
2070 ipc_endpoint: Option<String>,
2071 ipc: Option<jsonrpsee::server::ServerHandle>,
2072 jwt_secret: Option<JwtSecret>,
2073}
2074
2075impl RpcServerHandle {
2078 fn bearer_token(&self) -> Option<String> {
2080 self.jwt_secret.as_ref().map(|secret| {
2081 format!(
2082 "Bearer {}",
2083 secret
2084 .encode(&Claims {
2085 iat: (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() +
2086 Duration::from_secs(60))
2087 .as_secs(),
2088 exp: None,
2089 })
2090 .unwrap()
2091 )
2092 })
2093 }
2094 pub const fn http_local_addr(&self) -> Option<SocketAddr> {
2096 self.http_local_addr
2097 }
2098
2099 pub const fn ws_local_addr(&self) -> Option<SocketAddr> {
2101 self.ws_local_addr
2102 }
2103
2104 pub fn stop(self) -> Result<(), AlreadyStoppedError> {
2106 if let Some(handle) = self.http {
2107 handle.stop()?
2108 }
2109
2110 if let Some(handle) = self.ws {
2111 handle.stop()?
2112 }
2113
2114 if let Some(handle) = self.ipc {
2115 handle.stop()?
2116 }
2117
2118 Ok(())
2119 }
2120
2121 pub fn ipc_endpoint(&self) -> Option<String> {
2123 self.ipc_endpoint.clone()
2124 }
2125
2126 pub fn http_url(&self) -> Option<String> {
2128 self.http_local_addr.map(|addr| format!("http://{addr}"))
2129 }
2130
2131 pub fn ws_url(&self) -> Option<String> {
2133 self.ws_local_addr.map(|addr| format!("ws://{addr}"))
2134 }
2135
2136 pub fn http_client(&self) -> Option<jsonrpsee::http_client::HttpClient> {
2138 let url = self.http_url()?;
2139
2140 let client = if let Some(token) = self.bearer_token() {
2141 jsonrpsee::http_client::HttpClientBuilder::default()
2142 .set_headers(HeaderMap::from_iter([(AUTHORIZATION, token.parse().unwrap())]))
2143 .build(url)
2144 } else {
2145 jsonrpsee::http_client::HttpClientBuilder::default().build(url)
2146 };
2147
2148 client.expect("failed to create http client").into()
2149 }
2150
2151 pub async fn ws_client(&self) -> Option<jsonrpsee::ws_client::WsClient> {
2153 let url = self.ws_url()?;
2154 let mut builder = jsonrpsee::ws_client::WsClientBuilder::default();
2155
2156 if let Some(token) = self.bearer_token() {
2157 let headers = HeaderMap::from_iter([(AUTHORIZATION, token.parse().unwrap())]);
2158 builder = builder.set_headers(headers);
2159 }
2160
2161 let client = builder.build(url).await.expect("failed to create ws client");
2162 Some(client)
2163 }
2164
2165 pub fn eth_http_provider(
2167 &self,
2168 ) -> Option<impl Provider<alloy_network::Ethereum> + Clone + Unpin + 'static> {
2169 self.new_http_provider_for()
2170 }
2171
2172 pub fn eth_http_provider_with_wallet<W>(
2175 &self,
2176 wallet: W,
2177 ) -> Option<impl Provider<alloy_network::Ethereum> + Clone + Unpin + 'static>
2178 where
2179 W: IntoWallet<alloy_network::Ethereum, NetworkWallet: Clone + Unpin + 'static>,
2180 {
2181 let rpc_url = self.http_url()?;
2182 let provider =
2183 ProviderBuilder::new().wallet(wallet).connect_http(rpc_url.parse().expect("valid url"));
2184 Some(provider)
2185 }
2186
2187 pub fn new_http_provider_for<N>(&self) -> Option<impl Provider<N> + Clone + Unpin + 'static>
2192 where
2193 N: RecommendedFillers<RecommendedFillers: Unpin>,
2194 {
2195 let rpc_url = self.http_url()?;
2196 let provider = ProviderBuilder::default()
2197 .with_recommended_fillers()
2198 .connect_http(rpc_url.parse().expect("valid url"));
2199 Some(provider)
2200 }
2201
2202 pub async fn eth_ws_provider(
2204 &self,
2205 ) -> Option<impl Provider<alloy_network::Ethereum> + Clone + Unpin + 'static> {
2206 self.new_ws_provider_for().await
2207 }
2208
2209 pub async fn eth_ws_provider_with_wallet<W>(
2212 &self,
2213 wallet: W,
2214 ) -> Option<impl Provider<alloy_network::Ethereum> + Clone + Unpin + 'static>
2215 where
2216 W: IntoWallet<alloy_network::Ethereum, NetworkWallet: Clone + Unpin + 'static>,
2217 {
2218 let rpc_url = self.ws_url()?;
2219 let provider = ProviderBuilder::new()
2220 .wallet(wallet)
2221 .connect(&rpc_url)
2222 .await
2223 .expect("failed to create ws client");
2224 Some(provider)
2225 }
2226
2227 pub async fn new_ws_provider_for<N>(&self) -> Option<impl Provider<N> + Clone + Unpin + 'static>
2232 where
2233 N: RecommendedFillers<RecommendedFillers: Unpin>,
2234 {
2235 let rpc_url = self.ws_url()?;
2236 let provider = ProviderBuilder::default()
2237 .with_recommended_fillers()
2238 .connect(&rpc_url)
2239 .await
2240 .expect("failed to create ws client");
2241 Some(provider)
2242 }
2243
2244 pub async fn eth_ipc_provider(
2246 &self,
2247 ) -> Option<impl Provider<alloy_network::Ethereum> + Clone + Unpin + 'static> {
2248 self.new_ipc_provider_for().await
2249 }
2250
2251 pub async fn new_ipc_provider_for<N>(
2256 &self,
2257 ) -> Option<impl Provider<N> + Clone + Unpin + 'static>
2258 where
2259 N: RecommendedFillers<RecommendedFillers: Unpin>,
2260 {
2261 let rpc_url = self.ipc_endpoint()?;
2262 let provider = ProviderBuilder::default()
2263 .with_recommended_fillers()
2264 .connect(&rpc_url)
2265 .await
2266 .expect("failed to create ipc client");
2267 Some(provider)
2268 }
2269}
2270
2271#[cfg(test)]
2272mod tests {
2273 use super::*;
2274
2275 #[test]
2276 fn parse_eth_call_bundle_selection() {
2277 let selection = "eth,admin,debug".parse::<RpcModuleSelection>().unwrap();
2278 assert_eq!(
2279 selection,
2280 RpcModuleSelection::Selection(
2281 [RethRpcModule::Eth, RethRpcModule::Admin, RethRpcModule::Debug,].into()
2282 )
2283 );
2284 }
2285
2286 #[test]
2287 fn parse_rpc_module_selection() {
2288 let selection = "all".parse::<RpcModuleSelection>().unwrap();
2289 assert_eq!(selection, RpcModuleSelection::All);
2290 }
2291
2292 #[test]
2293 fn parse_rpc_module_selection_none() {
2294 let selection = "none".parse::<RpcModuleSelection>().unwrap();
2295 assert_eq!(selection, RpcModuleSelection::Selection(Default::default()));
2296 }
2297
2298 #[test]
2299 fn parse_rpc_unique_module_selection() {
2300 let selection = "eth,admin,eth,net".parse::<RpcModuleSelection>().unwrap();
2301 assert_eq!(
2302 selection,
2303 RpcModuleSelection::Selection(
2304 [RethRpcModule::Eth, RethRpcModule::Admin, RethRpcModule::Net,].into()
2305 )
2306 );
2307 }
2308
2309 #[test]
2310 fn identical_selection() {
2311 assert!(RpcModuleSelection::are_identical(
2312 Some(&RpcModuleSelection::All),
2313 Some(&RpcModuleSelection::All),
2314 ));
2315 assert!(!RpcModuleSelection::are_identical(
2316 Some(&RpcModuleSelection::All),
2317 Some(&RpcModuleSelection::Standard),
2318 ));
2319 assert!(RpcModuleSelection::are_identical(
2320 Some(&RpcModuleSelection::Selection(RpcModuleSelection::Standard.to_selection())),
2321 Some(&RpcModuleSelection::Standard),
2322 ));
2323 assert!(RpcModuleSelection::are_identical(
2324 Some(&RpcModuleSelection::Selection([RethRpcModule::Eth].into())),
2325 Some(&RpcModuleSelection::Selection([RethRpcModule::Eth].into())),
2326 ));
2327 assert!(RpcModuleSelection::are_identical(
2328 None,
2329 Some(&RpcModuleSelection::Selection(Default::default())),
2330 ));
2331 assert!(RpcModuleSelection::are_identical(
2332 Some(&RpcModuleSelection::Selection(Default::default())),
2333 None,
2334 ));
2335 assert!(RpcModuleSelection::are_identical(None, None));
2336 }
2337
2338 #[test]
2339 fn test_rpc_module_str() {
2340 macro_rules! assert_rpc_module {
2341 ($($s:expr => $v:expr,)*) => {
2342 $(
2343 let val: RethRpcModule = $s.parse().unwrap();
2344 assert_eq!(val, $v);
2345 assert_eq!(val.to_string(), $s);
2346 )*
2347 };
2348 }
2349 assert_rpc_module!
2350 (
2351 "admin" => RethRpcModule::Admin,
2352 "debug" => RethRpcModule::Debug,
2353 "eth" => RethRpcModule::Eth,
2354 "net" => RethRpcModule::Net,
2355 "trace" => RethRpcModule::Trace,
2356 "web3" => RethRpcModule::Web3,
2357 "rpc" => RethRpcModule::Rpc,
2358 "ots" => RethRpcModule::Ots,
2359 "reth" => RethRpcModule::Reth,
2360 );
2361 }
2362
2363 #[test]
2364 fn test_default_selection() {
2365 let selection = RpcModuleSelection::Standard.to_selection();
2366 assert_eq!(selection, [RethRpcModule::Eth, RethRpcModule::Net, RethRpcModule::Web3].into())
2367 }
2368
2369 #[test]
2370 fn test_create_rpc_module_config() {
2371 let selection = vec!["eth", "admin"];
2372 let config = RpcModuleSelection::try_from_selection(selection).unwrap();
2373 assert_eq!(
2374 config,
2375 RpcModuleSelection::Selection([RethRpcModule::Eth, RethRpcModule::Admin].into())
2376 );
2377 }
2378
2379 #[test]
2380 fn test_configure_transport_config() {
2381 let config = TransportRpcModuleConfig::default()
2382 .with_http([RethRpcModule::Eth, RethRpcModule::Admin]);
2383 assert_eq!(
2384 config,
2385 TransportRpcModuleConfig {
2386 http: Some(RpcModuleSelection::Selection(
2387 [RethRpcModule::Eth, RethRpcModule::Admin].into()
2388 )),
2389 ws: None,
2390 ipc: None,
2391 config: None,
2392 }
2393 )
2394 }
2395
2396 #[test]
2397 fn test_configure_transport_config_none() {
2398 let config = TransportRpcModuleConfig::default().with_http(Vec::<RethRpcModule>::new());
2399 assert_eq!(
2400 config,
2401 TransportRpcModuleConfig {
2402 http: Some(RpcModuleSelection::Selection(Default::default())),
2403 ws: None,
2404 ipc: None,
2405 config: None,
2406 }
2407 )
2408 }
2409
2410 fn create_test_module() -> RpcModule<()> {
2411 let mut module = RpcModule::new(());
2412 module.register_method("anything", |_, _, _| "succeed").unwrap();
2413 module
2414 }
2415
2416 #[test]
2417 fn test_remove_http_method() {
2418 let mut modules =
2419 TransportRpcModules { http: Some(create_test_module()), ..Default::default() };
2420 assert!(modules.remove_http_method("anything"));
2422
2423 assert!(!modules.remove_http_method("non_existent_method"));
2425
2426 assert!(modules.http.as_ref().unwrap().method("anything").is_none());
2428 }
2429
2430 #[test]
2431 fn test_remove_ws_method() {
2432 let mut modules =
2433 TransportRpcModules { ws: Some(create_test_module()), ..Default::default() };
2434
2435 assert!(modules.remove_ws_method("anything"));
2437
2438 assert!(!modules.remove_ws_method("non_existent_method"));
2440
2441 assert!(modules.ws.as_ref().unwrap().method("anything").is_none());
2443 }
2444
2445 #[test]
2446 fn test_remove_ipc_method() {
2447 let mut modules =
2448 TransportRpcModules { ipc: Some(create_test_module()), ..Default::default() };
2449
2450 assert!(modules.remove_ipc_method("anything"));
2452
2453 assert!(!modules.remove_ipc_method("non_existent_method"));
2455
2456 assert!(modules.ipc.as_ref().unwrap().method("anything").is_none());
2458 }
2459
2460 #[test]
2461 fn test_remove_method_from_configured() {
2462 let mut modules = TransportRpcModules {
2463 http: Some(create_test_module()),
2464 ws: Some(create_test_module()),
2465 ipc: Some(create_test_module()),
2466 ..Default::default()
2467 };
2468
2469 assert!(modules.remove_method_from_configured("anything"));
2471
2472 assert!(!modules.remove_method_from_configured("anything"));
2474
2475 assert!(!modules.remove_method_from_configured("non_existent_method"));
2477
2478 assert!(modules.http.as_ref().unwrap().method("anything").is_none());
2480 assert!(modules.ws.as_ref().unwrap().method("anything").is_none());
2481 assert!(modules.ipc.as_ref().unwrap().method("anything").is_none());
2482 }
2483
2484 #[test]
2485 fn test_transport_rpc_module_rename() {
2486 let mut modules = TransportRpcModules {
2487 http: Some(create_test_module()),
2488 ws: Some(create_test_module()),
2489 ipc: Some(create_test_module()),
2490 ..Default::default()
2491 };
2492
2493 assert!(modules.http.as_ref().unwrap().method("anything").is_some());
2495 assert!(modules.ws.as_ref().unwrap().method("anything").is_some());
2496 assert!(modules.ipc.as_ref().unwrap().method("anything").is_some());
2497
2498 assert!(modules.http.as_ref().unwrap().method("something").is_none());
2500 assert!(modules.ws.as_ref().unwrap().method("something").is_none());
2501 assert!(modules.ipc.as_ref().unwrap().method("something").is_none());
2502
2503 let mut other_module = RpcModule::new(());
2505 other_module.register_method("something", |_, _, _| "fails").unwrap();
2506
2507 modules.rename("anything", other_module).expect("rename failed");
2509
2510 assert!(modules.http.as_ref().unwrap().method("anything").is_none());
2512 assert!(modules.ws.as_ref().unwrap().method("anything").is_none());
2513 assert!(modules.ipc.as_ref().unwrap().method("anything").is_none());
2514
2515 assert!(modules.http.as_ref().unwrap().method("something").is_some());
2517 assert!(modules.ws.as_ref().unwrap().method("something").is_some());
2518 assert!(modules.ipc.as_ref().unwrap().method("something").is_some());
2519 }
2520
2521 #[test]
2522 fn test_replace_http_method() {
2523 let mut modules =
2524 TransportRpcModules { http: Some(create_test_module()), ..Default::default() };
2525
2526 let mut other_module = RpcModule::new(());
2527 other_module.register_method("something", |_, _, _| "fails").unwrap();
2528
2529 assert!(modules.replace_http(other_module.clone()).unwrap());
2530
2531 assert!(modules.http.as_ref().unwrap().method("something").is_some());
2532
2533 other_module.register_method("anything", |_, _, _| "fails").unwrap();
2534 assert!(modules.replace_http(other_module.clone()).unwrap());
2535
2536 assert!(modules.http.as_ref().unwrap().method("anything").is_some());
2537 }
2538 #[test]
2539 fn test_replace_ipc_method() {
2540 let mut modules =
2541 TransportRpcModules { ipc: Some(create_test_module()), ..Default::default() };
2542
2543 let mut other_module = RpcModule::new(());
2544 other_module.register_method("something", |_, _, _| "fails").unwrap();
2545
2546 assert!(modules.replace_ipc(other_module.clone()).unwrap());
2547
2548 assert!(modules.ipc.as_ref().unwrap().method("something").is_some());
2549
2550 other_module.register_method("anything", |_, _, _| "fails").unwrap();
2551 assert!(modules.replace_ipc(other_module.clone()).unwrap());
2552
2553 assert!(modules.ipc.as_ref().unwrap().method("anything").is_some());
2554 }
2555 #[test]
2556 fn test_replace_ws_method() {
2557 let mut modules =
2558 TransportRpcModules { ws: Some(create_test_module()), ..Default::default() };
2559
2560 let mut other_module = RpcModule::new(());
2561 other_module.register_method("something", |_, _, _| "fails").unwrap();
2562
2563 assert!(modules.replace_ws(other_module.clone()).unwrap());
2564
2565 assert!(modules.ws.as_ref().unwrap().method("something").is_some());
2566
2567 other_module.register_method("anything", |_, _, _| "fails").unwrap();
2568 assert!(modules.replace_ws(other_module.clone()).unwrap());
2569
2570 assert!(modules.ws.as_ref().unwrap().method("anything").is_some());
2571 }
2572
2573 #[test]
2574 fn test_replace_configured() {
2575 let mut modules = TransportRpcModules {
2576 http: Some(create_test_module()),
2577 ws: Some(create_test_module()),
2578 ipc: Some(create_test_module()),
2579 ..Default::default()
2580 };
2581 let mut other_module = RpcModule::new(());
2582 other_module.register_method("something", |_, _, _| "fails").unwrap();
2583
2584 assert!(modules.replace_configured(other_module).unwrap());
2585
2586 assert!(modules.http.as_ref().unwrap().method("something").is_some());
2588 assert!(modules.ipc.as_ref().unwrap().method("something").is_some());
2589 assert!(modules.ws.as_ref().unwrap().method("something").is_some());
2590
2591 assert!(modules.http.as_ref().unwrap().method("anything").is_some());
2592 assert!(modules.ipc.as_ref().unwrap().method("anything").is_some());
2593 assert!(modules.ws.as_ref().unwrap().method("anything").is_some());
2594 }
2595
2596 #[test]
2597 fn test_add_or_replace_if_module_configured() {
2598 let config = TransportRpcModuleConfig::default()
2600 .with_http([RethRpcModule::Eth])
2601 .with_ws([RethRpcModule::Eth]);
2602
2603 let mut http_module = RpcModule::new(());
2605 http_module.register_method("eth_existing", |_, _, _| "original").unwrap();
2606
2607 let mut ws_module = RpcModule::new(());
2609 ws_module.register_method("eth_existing", |_, _, _| "original").unwrap();
2610
2611 let ipc_module = RpcModule::new(());
2613
2614 let mut modules = TransportRpcModules {
2616 config,
2617 http: Some(http_module),
2618 ws: Some(ws_module),
2619 ipc: Some(ipc_module),
2620 };
2621
2622 let mut new_module = RpcModule::new(());
2624 new_module.register_method("eth_existing", |_, _, _| "replaced").unwrap(); new_module.register_method("eth_new", |_, _, _| "added").unwrap(); let new_methods: Methods = new_module.into();
2627
2628 let result = modules.add_or_replace_if_module_configured(RethRpcModule::Eth, new_methods);
2630 assert!(result.is_ok(), "Function should succeed");
2631
2632 let http = modules.http.as_ref().unwrap();
2634 assert!(http.method("eth_existing").is_some());
2635 assert!(http.method("eth_new").is_some());
2636
2637 let ws = modules.ws.as_ref().unwrap();
2639 assert!(ws.method("eth_existing").is_some());
2640 assert!(ws.method("eth_new").is_some());
2641
2642 let ipc = modules.ipc.as_ref().unwrap();
2644 assert!(ipc.method("eth_existing").is_none());
2645 assert!(ipc.method("eth_new").is_none());
2646 }
2647
2648 #[test]
2649 fn test_merge_if_module_configured_with_lazy_evaluation() {
2650 let config = TransportRpcModuleConfig::default().with_http([RethRpcModule::Eth]);
2652
2653 let mut modules =
2654 TransportRpcModules { config, http: Some(RpcModule::new(())), ws: None, ipc: None };
2655
2656 let mut closure_called = false;
2658
2659 let result = modules.merge_if_module_configured_with(RethRpcModule::Eth, || {
2661 closure_called = true;
2662 let mut methods = RpcModule::new(());
2663 methods.register_method("eth_test", |_, _, _| "test").unwrap();
2664 methods.into()
2665 });
2666
2667 assert!(result.is_ok());
2668 assert!(closure_called, "Closure should be called when module is configured");
2669 assert!(modules.http.as_ref().unwrap().method("eth_test").is_some());
2670
2671 closure_called = false;
2673 let result = modules.merge_if_module_configured_with(RethRpcModule::Debug, || {
2674 closure_called = true;
2675 RpcModule::new(()).into()
2676 });
2677
2678 assert!(result.is_ok());
2679 assert!(!closure_called, "Closure should NOT be called when module is not configured");
2680 }
2681}