1#![doc(
15 html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
16 html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
17 issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
18)]
19#![cfg_attr(not(test), warn(unused_crate_dependencies))]
20#![cfg_attr(docsrs, feature(doc_cfg))]
21
22use crate::{auth::AuthRpcModule, error::WsHttpSamePortError, metrics::RpcRequestMetrics};
23use alloy_network::{Ethereum, IntoWallet};
24use alloy_provider::{fillers::RecommendedFillers, Provider, ProviderBuilder};
25use core::marker::PhantomData;
26use error::{ConflictingModules, RpcError, ServerKind};
27use http::{header::AUTHORIZATION, HeaderMap};
28use jsonrpsee::{
29 core::RegisterMethodError,
30 server::{middleware::rpc::RpcServiceBuilder, AlreadyStoppedError, IdProvider, ServerHandle},
31 Methods, RpcModule,
32};
33use reth_chainspec::{ChainSpecProvider, EthereumHardforks};
34use reth_consensus::FullConsensus;
35use reth_engine_primitives::ConsensusEngineEvent;
36use reth_evm::ConfigureEvm;
37use reth_network_api::{noop::NoopNetwork, NetworkInfo, Peers};
38use reth_primitives_traits::{NodePrimitives, TxTy};
39use reth_rpc::{
40 AdminApi, DebugApi, EngineEthApi, EthApi, EthApiBuilder, EthBundle, MinerApi, NetApi,
41 OtterscanApi, RPCApi, RethApi, TraceApi, TxPoolApi, Web3Api,
42};
43use reth_rpc_api::servers::*;
44use reth_rpc_eth_api::{
45 helpers::{
46 pending_block::PendingEnvBuilder, Call, EthApiSpec, EthTransactions, LoadPendingBlock,
47 TraceExt,
48 },
49 node::RpcNodeCoreAdapter,
50 EthApiServer, EthApiTypes, FullEthApiServer, FullEthApiTypes, RpcBlock, RpcConvert,
51 RpcConverter, RpcHeader, RpcNodeCore, RpcReceipt, RpcTransaction, RpcTxReq,
52};
53use reth_rpc_eth_types::{receipt::EthReceiptConverter, EthConfig, EthSubscriptionIdProvider};
54use reth_rpc_layer::{AuthLayer, Claims, CompressionLayer, JwtAuthValidator, JwtSecret};
55pub use reth_rpc_server_types::RethRpcModule;
56use reth_storage_api::{
57 AccountReader, BlockReader, ChangeSetReader, FullRpcProvider, NodePrimitivesProvider,
58 StateProviderFactory,
59};
60use reth_tasks::{pool::BlockingTaskGuard, TaskSpawner, TokioTaskExecutor};
61use reth_tokio_util::EventSender;
62use reth_transaction_pool::{noop::NoopTransactionPool, TransactionPool};
63use serde::{Deserialize, Serialize};
64use std::{
65 collections::HashMap,
66 fmt::Debug,
67 net::{Ipv4Addr, SocketAddr, SocketAddrV4},
68 time::{Duration, SystemTime, UNIX_EPOCH},
69};
70use tower_http::cors::CorsLayer;
71
72pub use cors::CorsDomainError;
73
74pub use jsonrpsee::server::ServerBuilder;
76use jsonrpsee::server::ServerConfigBuilder;
77pub use reth_ipc::server::{
78 Builder as IpcServerBuilder, RpcServiceBuilder as IpcRpcServiceBuilder,
79};
80pub use reth_rpc_server_types::{constants, RpcModuleSelection};
81pub use tower::layer::util::{Identity, Stack};
82
83pub mod auth;
85
86pub mod config;
88
89pub mod middleware;
91
92mod cors;
94
95pub mod error;
97
98pub mod eth;
100pub use eth::EthHandlers;
101
102mod metrics;
104use crate::middleware::RethRpcMiddleware;
105pub use metrics::{MeteredBatchRequestsFuture, MeteredRequestFuture, RpcRequestMetricsService};
106use reth_chain_state::{CanonStateSubscriptions, PersistedBlockSubscriptions};
107use reth_rpc::eth::sim_bundle::EthSimBundle;
108
109pub mod rate_limiter;
111
112#[derive(Debug, Clone)]
116pub struct RpcModuleBuilder<N, Provider, Pool, Network, EvmConfig, Consensus> {
117 provider: Provider,
119 pool: Pool,
121 network: Network,
123 executor: Box<dyn TaskSpawner + 'static>,
125 evm_config: EvmConfig,
127 consensus: Consensus,
129 _primitives: PhantomData<N>,
131}
132
133impl<N, Provider, Pool, Network, EvmConfig, Consensus>
136 RpcModuleBuilder<N, Provider, Pool, Network, EvmConfig, Consensus>
137{
138 pub const fn new(
140 provider: Provider,
141 pool: Pool,
142 network: Network,
143 executor: Box<dyn TaskSpawner + 'static>,
144 evm_config: EvmConfig,
145 consensus: Consensus,
146 ) -> Self {
147 Self { provider, pool, network, executor, evm_config, consensus, _primitives: PhantomData }
148 }
149
150 pub fn with_provider<P>(
152 self,
153 provider: P,
154 ) -> RpcModuleBuilder<N, P, Pool, Network, EvmConfig, Consensus> {
155 let Self { pool, network, executor, evm_config, consensus, _primitives, .. } = self;
156 RpcModuleBuilder { provider, network, pool, executor, evm_config, consensus, _primitives }
157 }
158
159 pub fn with_pool<P>(
161 self,
162 pool: P,
163 ) -> RpcModuleBuilder<N, Provider, P, Network, EvmConfig, Consensus> {
164 let Self { provider, network, executor, evm_config, consensus, _primitives, .. } = self;
165 RpcModuleBuilder { provider, network, pool, executor, evm_config, consensus, _primitives }
166 }
167
168 pub fn with_noop_pool(
174 self,
175 ) -> RpcModuleBuilder<N, Provider, NoopTransactionPool, Network, EvmConfig, Consensus> {
176 let Self { provider, executor, network, evm_config, consensus, _primitives, .. } = self;
177 RpcModuleBuilder {
178 provider,
179 executor,
180 network,
181 evm_config,
182 pool: NoopTransactionPool::default(),
183 consensus,
184 _primitives,
185 }
186 }
187
188 pub fn with_network<Net>(
190 self,
191 network: Net,
192 ) -> RpcModuleBuilder<N, Provider, Pool, Net, EvmConfig, Consensus> {
193 let Self { provider, pool, executor, evm_config, consensus, _primitives, .. } = self;
194 RpcModuleBuilder { provider, network, pool, executor, evm_config, consensus, _primitives }
195 }
196
197 pub fn with_noop_network(
203 self,
204 ) -> RpcModuleBuilder<N, Provider, Pool, NoopNetwork, EvmConfig, Consensus> {
205 let Self { provider, pool, executor, evm_config, consensus, _primitives, .. } = self;
206 RpcModuleBuilder {
207 provider,
208 pool,
209 executor,
210 network: NoopNetwork::default(),
211 evm_config,
212 consensus,
213 _primitives,
214 }
215 }
216
217 pub fn with_executor(self, executor: Box<dyn TaskSpawner + 'static>) -> Self {
219 let Self { pool, network, provider, evm_config, consensus, _primitives, .. } = self;
220 Self { provider, network, pool, executor, evm_config, consensus, _primitives }
221 }
222
223 pub fn with_tokio_executor(self) -> Self {
228 let Self { pool, network, provider, evm_config, consensus, _primitives, .. } = self;
229 Self {
230 provider,
231 network,
232 pool,
233 executor: Box::new(TokioTaskExecutor::default()),
234 evm_config,
235 consensus,
236 _primitives,
237 }
238 }
239
240 pub fn with_evm_config<E>(
242 self,
243 evm_config: E,
244 ) -> RpcModuleBuilder<N, Provider, Pool, Network, E, Consensus> {
245 let Self { provider, pool, executor, network, consensus, _primitives, .. } = self;
246 RpcModuleBuilder { provider, network, pool, executor, evm_config, consensus, _primitives }
247 }
248
249 pub fn with_consensus<C>(
251 self,
252 consensus: C,
253 ) -> RpcModuleBuilder<N, Provider, Pool, Network, EvmConfig, C> {
254 let Self { provider, network, pool, executor, evm_config, _primitives, .. } = self;
255 RpcModuleBuilder { provider, network, pool, executor, evm_config, consensus, _primitives }
256 }
257
258 #[expect(clippy::type_complexity)]
260 pub fn eth_api_builder<ChainSpec>(
261 &self,
262 ) -> EthApiBuilder<
263 RpcNodeCoreAdapter<Provider, Pool, Network, EvmConfig>,
264 RpcConverter<Ethereum, EvmConfig, EthReceiptConverter<ChainSpec>>,
265 >
266 where
267 Provider: Clone,
268 Pool: Clone,
269 Network: Clone,
270 EvmConfig: Clone,
271 RpcNodeCoreAdapter<Provider, Pool, Network, EvmConfig>:
272 RpcNodeCore<Provider: ChainSpecProvider<ChainSpec = ChainSpec>, Evm = EvmConfig>,
273 {
274 EthApiBuilder::new(
275 self.provider.clone(),
276 self.pool.clone(),
277 self.network.clone(),
278 self.evm_config.clone(),
279 )
280 }
281
282 #[expect(clippy::type_complexity)]
288 pub fn bootstrap_eth_api<ChainSpec>(
289 &self,
290 ) -> EthApi<
291 RpcNodeCoreAdapter<Provider, Pool, Network, EvmConfig>,
292 RpcConverter<Ethereum, EvmConfig, EthReceiptConverter<ChainSpec>>,
293 >
294 where
295 Provider: Clone,
296 Pool: Clone,
297 Network: Clone,
298 EvmConfig: ConfigureEvm + Clone,
299 RpcNodeCoreAdapter<Provider, Pool, Network, EvmConfig>:
300 RpcNodeCore<Provider: ChainSpecProvider<ChainSpec = ChainSpec>, Evm = EvmConfig>,
301 RpcConverter<Ethereum, EvmConfig, EthReceiptConverter<ChainSpec>>: RpcConvert,
302 (): PendingEnvBuilder<EvmConfig>,
303 {
304 self.eth_api_builder().build()
305 }
306}
307
308impl<N, Provider, Pool, Network, EvmConfig, Consensus>
309 RpcModuleBuilder<N, Provider, Pool, Network, EvmConfig, Consensus>
310where
311 N: NodePrimitives,
312 Provider: FullRpcProvider<Block = N::Block, Receipt = N::Receipt, Header = N::BlockHeader>
313 + CanonStateSubscriptions<Primitives = N>
314 + PersistedBlockSubscriptions
315 + AccountReader
316 + ChangeSetReader,
317 Pool: TransactionPool + Clone + 'static,
318 Network: NetworkInfo + Peers + Clone + 'static,
319 EvmConfig: ConfigureEvm<Primitives = N> + 'static,
320 Consensus: FullConsensus<N> + Clone + 'static,
321{
322 pub fn build_with_auth_server<EthApi>(
329 self,
330 module_config: TransportRpcModuleConfig,
331 engine: impl IntoEngineApiRpcModule,
332 eth: EthApi,
333 engine_events: EventSender<ConsensusEngineEvent<N>>,
334 ) -> (
335 TransportRpcModules,
336 AuthRpcModule,
337 RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>,
338 )
339 where
340 EthApi: FullEthApiServer<Provider = Provider, Pool = Pool>,
341 {
342 let config = module_config.config.clone().unwrap_or_default();
343
344 let mut registry = self.into_registry(config, eth, engine_events);
345 let modules = registry.create_transport_rpc_modules(module_config);
346 let auth_module = registry.create_auth_module(engine);
347
348 (modules, auth_module, registry)
349 }
350
351 pub fn into_registry<EthApi>(
356 self,
357 config: RpcModuleConfig,
358 eth: EthApi,
359 engine_events: EventSender<ConsensusEngineEvent<N>>,
360 ) -> RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
361 where
362 EthApi: FullEthApiServer<Provider = Provider, Pool = Pool>,
363 {
364 let Self { provider, pool, network, executor, consensus, evm_config, .. } = self;
365 RpcRegistryInner::new(
366 provider,
367 pool,
368 network,
369 executor,
370 consensus,
371 config,
372 evm_config,
373 eth,
374 engine_events,
375 )
376 }
377
378 pub fn build<EthApi>(
381 self,
382 module_config: TransportRpcModuleConfig,
383 eth: EthApi,
384 engine_events: EventSender<ConsensusEngineEvent<N>>,
385 ) -> TransportRpcModules<()>
386 where
387 EthApi: FullEthApiServer<Provider = Provider, Pool = Pool>,
388 {
389 let mut modules = TransportRpcModules::default();
390
391 if !module_config.is_empty() {
392 let TransportRpcModuleConfig { http, ws, ipc, config } = module_config.clone();
393
394 let mut registry = self.into_registry(config.unwrap_or_default(), eth, engine_events);
395
396 modules.config = module_config;
397 modules.http = registry.maybe_module(http.as_ref());
398 modules.ws = registry.maybe_module(ws.as_ref());
399 modules.ipc = registry.maybe_module(ipc.as_ref());
400 }
401
402 modules
403 }
404}
405
406impl<N: NodePrimitives> Default for RpcModuleBuilder<N, (), (), (), (), ()> {
407 fn default() -> Self {
408 Self::new((), (), (), Box::new(TokioTaskExecutor::default()), (), ())
409 }
410}
411
412#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize)]
414pub struct RpcModuleConfig {
415 eth: EthConfig,
417}
418
419impl RpcModuleConfig {
422 pub fn builder() -> RpcModuleConfigBuilder {
424 RpcModuleConfigBuilder::default()
425 }
426
427 pub const fn new(eth: EthConfig) -> Self {
429 Self { eth }
430 }
431
432 pub const fn eth(&self) -> &EthConfig {
434 &self.eth
435 }
436
437 pub const fn eth_mut(&mut self) -> &mut EthConfig {
439 &mut self.eth
440 }
441}
442
443#[derive(Clone, Debug, Default)]
445pub struct RpcModuleConfigBuilder {
446 eth: Option<EthConfig>,
447}
448
449impl RpcModuleConfigBuilder {
452 pub fn eth(mut self, eth: EthConfig) -> Self {
454 self.eth = Some(eth);
455 self
456 }
457
458 pub fn build(self) -> RpcModuleConfig {
460 let Self { eth } = self;
461 RpcModuleConfig { eth: eth.unwrap_or_default() }
462 }
463
464 pub const fn get_eth(&self) -> Option<&EthConfig> {
466 self.eth.as_ref()
467 }
468
469 pub const fn eth_mut(&mut self) -> &mut Option<EthConfig> {
471 &mut self.eth
472 }
473
474 pub fn eth_mut_or_default(&mut self) -> &mut EthConfig {
476 self.eth.get_or_insert_with(EthConfig::default)
477 }
478}
479
480#[derive(Debug)]
482pub struct RpcRegistryInner<Provider, Pool, Network, EthApi: EthApiTypes, EvmConfig, Consensus> {
483 provider: Provider,
484 pool: Pool,
485 network: Network,
486 executor: Box<dyn TaskSpawner + 'static>,
487 evm_config: EvmConfig,
488 consensus: Consensus,
489 eth: EthHandlers<EthApi>,
491 blocking_pool_guard: BlockingTaskGuard,
493 modules: HashMap<RethRpcModule, Methods>,
495 eth_config: EthConfig,
497 engine_events:
499 EventSender<ConsensusEngineEvent<<EthApi::RpcConvert as RpcConvert>::Primitives>>,
500}
501
502impl<N, Provider, Pool, Network, EthApi, EvmConfig, Consensus>
505 RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
506where
507 N: NodePrimitives,
508 Provider: StateProviderFactory
509 + CanonStateSubscriptions<Primitives = N>
510 + BlockReader<Block = N::Block, Receipt = N::Receipt>
511 + Clone
512 + Unpin
513 + 'static,
514 Pool: Send + Sync + Clone + 'static,
515 Network: Clone + 'static,
516 EthApi: FullEthApiTypes + 'static,
517 EvmConfig: ConfigureEvm<Primitives = N>,
518{
519 #[expect(clippy::too_many_arguments)]
521 pub fn new(
522 provider: Provider,
523 pool: Pool,
524 network: Network,
525 executor: Box<dyn TaskSpawner + 'static>,
526 consensus: Consensus,
527 config: RpcModuleConfig,
528 evm_config: EvmConfig,
529 eth_api: EthApi,
530 engine_events: EventSender<
531 ConsensusEngineEvent<<EthApi::Provider as NodePrimitivesProvider>::Primitives>,
532 >,
533 ) -> Self
534 where
535 EvmConfig: ConfigureEvm<Primitives = N>,
536 {
537 let blocking_pool_guard = BlockingTaskGuard::new(config.eth.max_tracing_requests);
538
539 let eth = EthHandlers::bootstrap(config.eth.clone(), executor.clone(), eth_api);
540
541 Self {
542 provider,
543 pool,
544 network,
545 eth,
546 executor,
547 consensus,
548 modules: Default::default(),
549 blocking_pool_guard,
550 eth_config: config.eth,
551 evm_config,
552 engine_events,
553 }
554 }
555}
556
557impl<Provider, Pool, Network, EthApi, Evm, Consensus>
558 RpcRegistryInner<Provider, Pool, Network, EthApi, Evm, Consensus>
559where
560 EthApi: EthApiTypes,
561{
562 pub const fn eth_api(&self) -> &EthApi {
564 &self.eth.api
565 }
566
567 pub const fn eth_handlers(&self) -> &EthHandlers<EthApi> {
569 &self.eth
570 }
571
572 pub const fn pool(&self) -> &Pool {
574 &self.pool
575 }
576
577 pub const fn tasks(&self) -> &(dyn TaskSpawner + 'static) {
579 &*self.executor
580 }
581
582 pub const fn provider(&self) -> &Provider {
584 &self.provider
585 }
586
587 pub const fn evm_config(&self) -> &Evm {
589 &self.evm_config
590 }
591
592 pub fn methods(&self) -> Vec<Methods> {
594 self.modules.values().cloned().collect()
595 }
596
597 pub fn module(&self) -> RpcModule<()> {
599 let mut module = RpcModule::new(());
600 for methods in self.modules.values().cloned() {
601 module.merge(methods).expect("No conflicts");
602 }
603 module
604 }
605}
606
607impl<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
608 RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
609where
610 Network: NetworkInfo + Clone + 'static,
611 EthApi: EthApiTypes,
612 Provider: BlockReader + ChainSpecProvider<ChainSpec: EthereumHardforks>,
613 EvmConfig: ConfigureEvm,
614{
615 pub fn admin_api(&self) -> AdminApi<Network, Provider::ChainSpec, Pool>
617 where
618 Network: Peers,
619 Pool: TransactionPool + Clone + 'static,
620 {
621 AdminApi::new(self.network.clone(), self.provider.chain_spec(), self.pool.clone())
622 }
623
624 pub fn web3_api(&self) -> Web3Api<Network> {
626 Web3Api::new(self.network.clone())
627 }
628
629 pub fn register_admin(&mut self) -> &mut Self
631 where
632 Network: Peers,
633 Pool: TransactionPool + Clone + 'static,
634 {
635 let adminapi = self.admin_api();
636 self.modules.insert(RethRpcModule::Admin, adminapi.into_rpc().into());
637 self
638 }
639
640 pub fn register_web3(&mut self) -> &mut Self {
642 let web3api = self.web3_api();
643 self.modules.insert(RethRpcModule::Web3, web3api.into_rpc().into());
644 self
645 }
646}
647
648impl<N, Provider, Pool, Network, EthApi, EvmConfig, Consensus>
649 RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
650where
651 N: NodePrimitives,
652 Provider: FullRpcProvider<
653 Header = N::BlockHeader,
654 Block = N::Block,
655 Receipt = N::Receipt,
656 Transaction = N::SignedTx,
657 > + AccountReader
658 + ChangeSetReader
659 + CanonStateSubscriptions
660 + PersistedBlockSubscriptions,
661 Network: NetworkInfo + Peers + Clone + 'static,
662 EthApi: EthApiServer<
663 RpcTxReq<EthApi::NetworkTypes>,
664 RpcTransaction<EthApi::NetworkTypes>,
665 RpcBlock<EthApi::NetworkTypes>,
666 RpcReceipt<EthApi::NetworkTypes>,
667 RpcHeader<EthApi::NetworkTypes>,
668 TxTy<N>,
669 > + EthApiTypes,
670 EvmConfig: ConfigureEvm<Primitives = N> + 'static,
671{
672 pub fn register_eth(&mut self) -> &mut Self {
678 let eth_api = self.eth_api().clone();
679 self.modules.insert(RethRpcModule::Eth, eth_api.into_rpc().into());
680 self
681 }
682
683 pub fn register_ots(&mut self) -> &mut Self
689 where
690 EthApi: TraceExt + EthTransactions<Primitives = N>,
691 {
692 let otterscan_api = self.otterscan_api();
693 self.modules.insert(RethRpcModule::Ots, otterscan_api.into_rpc().into());
694 self
695 }
696
697 pub fn register_debug(&mut self) -> &mut Self
703 where
704 EthApi: EthTransactions + TraceExt,
705 {
706 let debug_api = self.debug_api();
707 self.modules.insert(RethRpcModule::Debug, debug_api.into_rpc().into());
708 self
709 }
710
711 pub fn register_trace(&mut self) -> &mut Self
717 where
718 EthApi: TraceExt,
719 {
720 let trace_api = self.trace_api();
721 self.modules.insert(RethRpcModule::Trace, trace_api.into_rpc().into());
722 self
723 }
724
725 pub fn register_net(&mut self) -> &mut Self
733 where
734 EthApi: EthApiSpec + 'static,
735 {
736 let netapi = self.net_api();
737 self.modules.insert(RethRpcModule::Net, netapi.into_rpc().into());
738 self
739 }
740
741 pub fn register_reth(&mut self) -> &mut Self {
749 let rethapi = self.reth_api();
750 self.modules.insert(RethRpcModule::Reth, rethapi.into_rpc().into());
751 self
752 }
753
754 pub fn otterscan_api(&self) -> OtterscanApi<EthApi> {
760 let eth_api = self.eth_api().clone();
761 OtterscanApi::new(eth_api)
762 }
763}
764
765impl<N, Provider, Pool, Network, EthApi, EvmConfig, Consensus>
766 RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
767where
768 N: NodePrimitives,
769 Provider: FullRpcProvider<
770 Block = N::Block,
771 Header = N::BlockHeader,
772 Transaction = N::SignedTx,
773 Receipt = N::Receipt,
774 > + AccountReader
775 + ChangeSetReader,
776 Network: NetworkInfo + Peers + Clone + 'static,
777 EthApi: EthApiTypes,
778 EvmConfig: ConfigureEvm<Primitives = N>,
779{
780 pub fn trace_api(&self) -> TraceApi<EthApi> {
786 TraceApi::new(
787 self.eth_api().clone(),
788 self.blocking_pool_guard.clone(),
789 self.eth_config.clone(),
790 )
791 }
792
793 pub fn bundle_api(&self) -> EthBundle<EthApi>
799 where
800 EthApi: EthTransactions + LoadPendingBlock + Call,
801 {
802 let eth_api = self.eth_api().clone();
803 EthBundle::new(eth_api, self.blocking_pool_guard.clone())
804 }
805
806 pub fn debug_api(&self) -> DebugApi<EthApi>
812 where
813 EthApi: FullEthApiTypes,
814 {
815 DebugApi::new(
816 self.eth_api().clone(),
817 self.blocking_pool_guard.clone(),
818 self.tasks(),
819 self.engine_events.new_listener(),
820 )
821 }
822
823 pub fn net_api(&self) -> NetApi<Network, EthApi>
829 where
830 EthApi: EthApiSpec + 'static,
831 {
832 let eth_api = self.eth_api().clone();
833 NetApi::new(self.network.clone(), eth_api)
834 }
835
836 pub fn reth_api(&self) -> RethApi<Provider> {
838 RethApi::new(self.provider.clone(), self.executor.clone())
839 }
840}
841
842impl<N, Provider, Pool, Network, EthApi, EvmConfig, Consensus>
843 RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
844where
845 N: NodePrimitives,
846 Provider: FullRpcProvider<Block = N::Block>
847 + CanonStateSubscriptions<Primitives = N>
848 + PersistedBlockSubscriptions
849 + AccountReader
850 + ChangeSetReader,
851 Pool: TransactionPool + Clone + 'static,
852 Network: NetworkInfo + Peers + Clone + 'static,
853 EthApi: FullEthApiServer,
854 EvmConfig: ConfigureEvm<Primitives = N> + 'static,
855 Consensus: FullConsensus<N> + Clone + 'static,
856{
857 pub fn create_auth_module(&self, engine_api: impl IntoEngineApiRpcModule) -> AuthRpcModule {
863 let mut module = engine_api.into_rpc_module();
864
865 let eth_handlers = self.eth_handlers();
867 let engine_eth = EngineEthApi::new(eth_handlers.api.clone(), eth_handlers.filter.clone());
868
869 module.merge(engine_eth.into_rpc()).expect("No conflicting methods");
870
871 AuthRpcModule { inner: module }
872 }
873
874 fn maybe_module(&mut self, config: Option<&RpcModuleSelection>) -> Option<RpcModule<()>> {
876 config.map(|config| self.module_for(config))
877 }
878
879 pub fn create_transport_rpc_modules(
883 &mut self,
884 config: TransportRpcModuleConfig,
885 ) -> TransportRpcModules<()> {
886 let mut modules = TransportRpcModules::default();
887 let http = self.maybe_module(config.http.as_ref());
888 let ws = self.maybe_module(config.ws.as_ref());
889 let ipc = self.maybe_module(config.ipc.as_ref());
890
891 modules.config = config;
892 modules.http = http;
893 modules.ws = ws;
894 modules.ipc = ipc;
895 modules
896 }
897
898 pub fn module_for(&mut self, config: &RpcModuleSelection) -> RpcModule<()> {
901 let mut module = RpcModule::new(());
902 let all_methods = self.reth_methods(config.iter_selection());
903 for methods in all_methods {
904 module.merge(methods).expect("No conflicts");
905 }
906 module
907 }
908
909 pub fn reth_methods(
918 &mut self,
919 namespaces: impl Iterator<Item = RethRpcModule>,
920 ) -> Vec<Methods> {
921 let EthHandlers { api: eth_api, filter: eth_filter, pubsub: eth_pubsub, .. } =
922 self.eth_handlers().clone();
923
924 let namespaces: Vec<_> = namespaces.collect();
926 namespaces
927 .iter()
928 .map(|namespace| {
929 self.modules
930 .entry(namespace.clone())
931 .or_insert_with(|| match namespace.clone() {
932 RethRpcModule::Admin => AdminApi::new(
933 self.network.clone(),
934 self.provider.chain_spec(),
935 self.pool.clone(),
936 )
937 .into_rpc()
938 .into(),
939 RethRpcModule::Debug => DebugApi::new(
940 eth_api.clone(),
941 self.blocking_pool_guard.clone(),
942 &*self.executor,
943 self.engine_events.new_listener(),
944 )
945 .into_rpc()
946 .into(),
947 RethRpcModule::Eth => {
948 let mut module = eth_api.clone().into_rpc();
950 module.merge(eth_filter.clone().into_rpc()).expect("No conflicts");
951 module.merge(eth_pubsub.clone().into_rpc()).expect("No conflicts");
952 module
953 .merge(
954 EthBundle::new(
955 eth_api.clone(),
956 self.blocking_pool_guard.clone(),
957 )
958 .into_rpc(),
959 )
960 .expect("No conflicts");
961
962 module.into()
963 }
964 RethRpcModule::Net => {
965 NetApi::new(self.network.clone(), eth_api.clone()).into_rpc().into()
966 }
967 RethRpcModule::Trace => TraceApi::new(
968 eth_api.clone(),
969 self.blocking_pool_guard.clone(),
970 self.eth_config.clone(),
971 )
972 .into_rpc()
973 .into(),
974 RethRpcModule::Web3 => Web3Api::new(self.network.clone()).into_rpc().into(),
975 RethRpcModule::Txpool => TxPoolApi::new(
976 self.eth.api.pool().clone(),
977 dyn_clone::clone(self.eth.api.converter()),
978 )
979 .into_rpc()
980 .into(),
981 RethRpcModule::Rpc => RPCApi::new(
982 namespaces
983 .iter()
984 .map(|module| (module.to_string(), "1.0".to_string()))
985 .collect(),
986 )
987 .into_rpc()
988 .into(),
989 RethRpcModule::Ots => OtterscanApi::new(eth_api.clone()).into_rpc().into(),
990 RethRpcModule::Reth => {
991 RethApi::new(self.provider.clone(), self.executor.clone())
992 .into_rpc()
993 .into()
994 }
995 RethRpcModule::Miner => MinerApi::default().into_rpc().into(),
996 RethRpcModule::Mev => {
997 EthSimBundle::new(eth_api.clone(), self.blocking_pool_guard.clone())
998 .into_rpc()
999 .into()
1000 }
1001 RethRpcModule::Flashbots |
1005 RethRpcModule::Testing |
1006 RethRpcModule::Other(_) => Default::default(),
1007 })
1008 .clone()
1009 })
1010 .collect::<Vec<_>>()
1011 }
1012}
1013
1014impl<Provider, Pool, Network, EthApi, EvmConfig, Consensus> Clone
1015 for RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
1016where
1017 EthApi: EthApiTypes,
1018 Provider: Clone,
1019 Pool: Clone,
1020 Network: Clone,
1021 EvmConfig: Clone,
1022 Consensus: Clone,
1023{
1024 fn clone(&self) -> Self {
1025 Self {
1026 provider: self.provider.clone(),
1027 pool: self.pool.clone(),
1028 network: self.network.clone(),
1029 executor: self.executor.clone(),
1030 evm_config: self.evm_config.clone(),
1031 consensus: self.consensus.clone(),
1032 eth: self.eth.clone(),
1033 blocking_pool_guard: self.blocking_pool_guard.clone(),
1034 modules: self.modules.clone(),
1035 eth_config: self.eth_config.clone(),
1036 engine_events: self.engine_events.clone(),
1037 }
1038 }
1039}
1040
1041#[derive(Debug)]
1053pub struct RpcServerConfig<RpcMiddleware = Identity> {
1054 http_server_config: Option<ServerConfigBuilder>,
1056 http_cors_domains: Option<String>,
1058 http_addr: Option<SocketAddr>,
1060 http_disable_compression: bool,
1062 ws_server_config: Option<ServerConfigBuilder>,
1064 ws_cors_domains: Option<String>,
1066 ws_addr: Option<SocketAddr>,
1068 ipc_server_config: Option<IpcServerBuilder<Identity, Identity>>,
1070 ipc_endpoint: Option<String>,
1072 jwt_secret: Option<JwtSecret>,
1074 rpc_middleware: RpcMiddleware,
1076}
1077
1078impl Default for RpcServerConfig<Identity> {
1081 fn default() -> Self {
1083 Self {
1084 http_server_config: None,
1085 http_cors_domains: None,
1086 http_addr: None,
1087 http_disable_compression: false,
1088 ws_server_config: None,
1089 ws_cors_domains: None,
1090 ws_addr: None,
1091 ipc_server_config: None,
1092 ipc_endpoint: None,
1093 jwt_secret: None,
1094 rpc_middleware: Default::default(),
1095 }
1096 }
1097}
1098
1099impl RpcServerConfig {
1100 pub fn http(config: ServerConfigBuilder) -> Self {
1102 Self::default().with_http(config)
1103 }
1104
1105 pub fn ws(config: ServerConfigBuilder) -> Self {
1107 Self::default().with_ws(config)
1108 }
1109
1110 pub fn ipc(config: IpcServerBuilder<Identity, Identity>) -> Self {
1112 Self::default().with_ipc(config)
1113 }
1114
1115 pub fn with_http(mut self, config: ServerConfigBuilder) -> Self {
1120 self.http_server_config =
1121 Some(config.set_id_provider(EthSubscriptionIdProvider::default()));
1122 self
1123 }
1124
1125 pub fn with_ws(mut self, config: ServerConfigBuilder) -> Self {
1130 self.ws_server_config = Some(config.set_id_provider(EthSubscriptionIdProvider::default()));
1131 self
1132 }
1133
1134 pub fn with_ipc(mut self, config: IpcServerBuilder<Identity, Identity>) -> Self {
1139 self.ipc_server_config = Some(config.set_id_provider(EthSubscriptionIdProvider::default()));
1140 self
1141 }
1142}
1143
1144impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
1145 pub fn set_rpc_middleware<T>(self, rpc_middleware: T) -> RpcServerConfig<T> {
1147 RpcServerConfig {
1148 http_server_config: self.http_server_config,
1149 http_cors_domains: self.http_cors_domains,
1150 http_addr: self.http_addr,
1151 http_disable_compression: self.http_disable_compression,
1152 ws_server_config: self.ws_server_config,
1153 ws_cors_domains: self.ws_cors_domains,
1154 ws_addr: self.ws_addr,
1155 ipc_server_config: self.ipc_server_config,
1156 ipc_endpoint: self.ipc_endpoint,
1157 jwt_secret: self.jwt_secret,
1158 rpc_middleware,
1159 }
1160 }
1161
1162 pub fn with_cors(self, cors_domain: Option<String>) -> Self {
1164 self.with_http_cors(cors_domain.clone()).with_ws_cors(cors_domain)
1165 }
1166
1167 pub fn with_ws_cors(mut self, cors_domain: Option<String>) -> Self {
1169 self.ws_cors_domains = cors_domain;
1170 self
1171 }
1172
1173 pub const fn with_http_disable_compression(mut self, http_disable_compression: bool) -> Self {
1175 self.http_disable_compression = http_disable_compression;
1176 self
1177 }
1178
1179 pub fn with_http_cors(mut self, cors_domain: Option<String>) -> Self {
1181 self.http_cors_domains = cors_domain;
1182 self
1183 }
1184
1185 pub const fn with_http_address(mut self, addr: SocketAddr) -> Self {
1190 self.http_addr = Some(addr);
1191 self
1192 }
1193
1194 pub const fn with_ws_address(mut self, addr: SocketAddr) -> Self {
1199 self.ws_addr = Some(addr);
1200 self
1201 }
1202
1203 pub fn with_id_provider<I>(mut self, id_provider: I) -> Self
1207 where
1208 I: IdProvider + Clone + 'static,
1209 {
1210 if let Some(config) = self.http_server_config {
1211 self.http_server_config = Some(config.set_id_provider(id_provider.clone()));
1212 }
1213 if let Some(config) = self.ws_server_config {
1214 self.ws_server_config = Some(config.set_id_provider(id_provider.clone()));
1215 }
1216 if let Some(ipc) = self.ipc_server_config {
1217 self.ipc_server_config = Some(ipc.set_id_provider(id_provider));
1218 }
1219
1220 self
1221 }
1222
1223 pub fn with_ipc_endpoint(mut self, path: impl Into<String>) -> Self {
1227 self.ipc_endpoint = Some(path.into());
1228 self
1229 }
1230
1231 pub const fn with_jwt_secret(mut self, secret: Option<JwtSecret>) -> Self {
1233 self.jwt_secret = secret;
1234 self
1235 }
1236
1237 pub fn with_tokio_runtime(mut self, tokio_runtime: Option<tokio::runtime::Handle>) -> Self {
1239 let Some(tokio_runtime) = tokio_runtime else { return self };
1240 if let Some(http_server_config) = self.http_server_config {
1241 self.http_server_config =
1242 Some(http_server_config.custom_tokio_runtime(tokio_runtime.clone()));
1243 }
1244 if let Some(ws_server_config) = self.ws_server_config {
1245 self.ws_server_config =
1246 Some(ws_server_config.custom_tokio_runtime(tokio_runtime.clone()));
1247 }
1248 if let Some(ipc_server_config) = self.ipc_server_config {
1249 self.ipc_server_config = Some(ipc_server_config.custom_tokio_runtime(tokio_runtime));
1250 }
1251 self
1252 }
1253
1254 pub const fn has_server(&self) -> bool {
1258 self.http_server_config.is_some() ||
1259 self.ws_server_config.is_some() ||
1260 self.ipc_server_config.is_some()
1261 }
1262
1263 pub const fn http_address(&self) -> Option<SocketAddr> {
1265 self.http_addr
1266 }
1267
1268 pub const fn ws_address(&self) -> Option<SocketAddr> {
1270 self.ws_addr
1271 }
1272
1273 pub fn ipc_endpoint(&self) -> Option<String> {
1275 self.ipc_endpoint.clone()
1276 }
1277
1278 fn maybe_cors_layer(cors: Option<String>) -> Result<Option<CorsLayer>, CorsDomainError> {
1280 cors.as_deref().map(cors::create_cors_layer).transpose()
1281 }
1282
1283 fn maybe_jwt_layer(jwt_secret: Option<JwtSecret>) -> Option<AuthLayer<JwtAuthValidator>> {
1285 jwt_secret.map(|secret| AuthLayer::new(JwtAuthValidator::new(secret)))
1286 }
1287
1288 fn maybe_compression_layer(disable_compression: bool) -> Option<CompressionLayer> {
1291 if disable_compression {
1292 None
1293 } else {
1294 Some(CompressionLayer::new())
1295 }
1296 }
1297
1298 pub async fn start(self, modules: &TransportRpcModules) -> Result<RpcServerHandle, RpcError>
1304 where
1305 RpcMiddleware: RethRpcMiddleware,
1306 {
1307 let mut http_handle = None;
1308 let mut ws_handle = None;
1309 let mut ipc_handle = None;
1310
1311 let http_socket_addr = self.http_addr.unwrap_or(SocketAddr::V4(SocketAddrV4::new(
1312 Ipv4Addr::LOCALHOST,
1313 constants::DEFAULT_HTTP_RPC_PORT,
1314 )));
1315
1316 let ws_socket_addr = self.ws_addr.unwrap_or(SocketAddr::V4(SocketAddrV4::new(
1317 Ipv4Addr::LOCALHOST,
1318 constants::DEFAULT_WS_RPC_PORT,
1319 )));
1320
1321 let metrics = modules.ipc.as_ref().map(RpcRequestMetrics::ipc).unwrap_or_default();
1322 let ipc_path =
1323 self.ipc_endpoint.clone().unwrap_or_else(|| constants::DEFAULT_IPC_ENDPOINT.into());
1324
1325 if let Some(builder) = self.ipc_server_config {
1326 let ipc = builder
1327 .set_rpc_middleware(IpcRpcServiceBuilder::new().layer(metrics))
1328 .build(ipc_path);
1329 ipc_handle = Some(ipc.start(modules.ipc.clone().expect("ipc server error")).await?);
1330 }
1331
1332 if self.http_addr == self.ws_addr &&
1334 self.http_server_config.is_some() &&
1335 self.ws_server_config.is_some()
1336 {
1337 let cors = match (self.ws_cors_domains.as_ref(), self.http_cors_domains.as_ref()) {
1338 (Some(ws_cors), Some(http_cors)) => {
1339 if ws_cors.trim() != http_cors.trim() {
1340 return Err(WsHttpSamePortError::ConflictingCorsDomains {
1341 http_cors_domains: Some(http_cors.clone()),
1342 ws_cors_domains: Some(ws_cors.clone()),
1343 }
1344 .into());
1345 }
1346 Some(ws_cors)
1347 }
1348 (a, b) => a.or(b),
1349 }
1350 .cloned();
1351
1352 modules.config.ensure_ws_http_identical()?;
1354
1355 if let Some(config) = self.http_server_config {
1356 let server = ServerBuilder::new()
1357 .set_http_middleware(
1358 tower::ServiceBuilder::new()
1359 .option_layer(Self::maybe_cors_layer(cors)?)
1360 .option_layer(Self::maybe_jwt_layer(self.jwt_secret))
1361 .option_layer(Self::maybe_compression_layer(
1362 self.http_disable_compression,
1363 )),
1364 )
1365 .set_rpc_middleware(
1366 RpcServiceBuilder::default()
1367 .layer(
1368 modules
1369 .http
1370 .as_ref()
1371 .or(modules.ws.as_ref())
1372 .map(RpcRequestMetrics::same_port)
1373 .unwrap_or_default(),
1374 )
1375 .layer(self.rpc_middleware.clone()),
1376 )
1377 .set_config(config.build())
1378 .build(http_socket_addr)
1379 .await
1380 .map_err(|err| {
1381 RpcError::server_error(err, ServerKind::WsHttp(http_socket_addr))
1382 })?;
1383 let addr = server.local_addr().map_err(|err| {
1384 RpcError::server_error(err, ServerKind::WsHttp(http_socket_addr))
1385 })?;
1386 if let Some(module) = modules.http.as_ref().or(modules.ws.as_ref()) {
1387 let handle = server.start(module.clone());
1388 http_handle = Some(handle.clone());
1389 ws_handle = Some(handle);
1390 }
1391 return Ok(RpcServerHandle {
1392 http_local_addr: Some(addr),
1393 ws_local_addr: Some(addr),
1394 http: http_handle,
1395 ws: ws_handle,
1396 ipc_endpoint: self.ipc_endpoint.clone(),
1397 ipc: ipc_handle,
1398 jwt_secret: self.jwt_secret,
1399 });
1400 }
1401 }
1402
1403 let mut ws_local_addr = None;
1404 let mut ws_server = None;
1405 let mut http_local_addr = None;
1406 let mut http_server = None;
1407
1408 if let Some(config) = self.ws_server_config {
1409 let server = ServerBuilder::new()
1410 .set_config(config.ws_only().build())
1411 .set_http_middleware(
1412 tower::ServiceBuilder::new()
1413 .option_layer(Self::maybe_cors_layer(self.ws_cors_domains.clone())?)
1414 .option_layer(Self::maybe_jwt_layer(self.jwt_secret)),
1415 )
1416 .set_rpc_middleware(
1417 RpcServiceBuilder::default()
1418 .layer(modules.ws.as_ref().map(RpcRequestMetrics::ws).unwrap_or_default())
1419 .layer(self.rpc_middleware.clone()),
1420 )
1421 .build(ws_socket_addr)
1422 .await
1423 .map_err(|err| RpcError::server_error(err, ServerKind::WS(ws_socket_addr)))?;
1424
1425 let addr = server
1426 .local_addr()
1427 .map_err(|err| RpcError::server_error(err, ServerKind::WS(ws_socket_addr)))?;
1428
1429 ws_local_addr = Some(addr);
1430 ws_server = Some(server);
1431 }
1432
1433 if let Some(config) = self.http_server_config {
1434 let server = ServerBuilder::new()
1435 .set_config(config.http_only().build())
1436 .set_http_middleware(
1437 tower::ServiceBuilder::new()
1438 .option_layer(Self::maybe_cors_layer(self.http_cors_domains.clone())?)
1439 .option_layer(Self::maybe_jwt_layer(self.jwt_secret))
1440 .option_layer(Self::maybe_compression_layer(self.http_disable_compression)),
1441 )
1442 .set_rpc_middleware(
1443 RpcServiceBuilder::default()
1444 .layer(
1445 modules.http.as_ref().map(RpcRequestMetrics::http).unwrap_or_default(),
1446 )
1447 .layer(self.rpc_middleware.clone()),
1448 )
1449 .build(http_socket_addr)
1450 .await
1451 .map_err(|err| RpcError::server_error(err, ServerKind::Http(http_socket_addr)))?;
1452 let local_addr = server
1453 .local_addr()
1454 .map_err(|err| RpcError::server_error(err, ServerKind::Http(http_socket_addr)))?;
1455 http_local_addr = Some(local_addr);
1456 http_server = Some(server);
1457 }
1458
1459 http_handle = http_server
1460 .map(|http_server| http_server.start(modules.http.clone().expect("http server error")));
1461 ws_handle = ws_server
1462 .map(|ws_server| ws_server.start(modules.ws.clone().expect("ws server error")));
1463 Ok(RpcServerHandle {
1464 http_local_addr,
1465 ws_local_addr,
1466 http: http_handle,
1467 ws: ws_handle,
1468 ipc_endpoint: self.ipc_endpoint.clone(),
1469 ipc: ipc_handle,
1470 jwt_secret: self.jwt_secret,
1471 })
1472 }
1473}
1474
1475#[derive(Debug, Clone, Default, Eq, PartialEq)]
1487pub struct TransportRpcModuleConfig {
1488 http: Option<RpcModuleSelection>,
1490 ws: Option<RpcModuleSelection>,
1492 ipc: Option<RpcModuleSelection>,
1494 config: Option<RpcModuleConfig>,
1496}
1497
1498impl TransportRpcModuleConfig {
1501 pub fn set_http(http: impl Into<RpcModuleSelection>) -> Self {
1503 Self::default().with_http(http)
1504 }
1505
1506 pub fn set_ws(ws: impl Into<RpcModuleSelection>) -> Self {
1508 Self::default().with_ws(ws)
1509 }
1510
1511 pub fn set_ipc(ipc: impl Into<RpcModuleSelection>) -> Self {
1513 Self::default().with_ipc(ipc)
1514 }
1515
1516 pub fn with_http(mut self, http: impl Into<RpcModuleSelection>) -> Self {
1518 self.http = Some(http.into());
1519 self
1520 }
1521
1522 pub fn with_ws(mut self, ws: impl Into<RpcModuleSelection>) -> Self {
1524 self.ws = Some(ws.into());
1525 self
1526 }
1527
1528 pub fn with_ipc(mut self, ipc: impl Into<RpcModuleSelection>) -> Self {
1530 self.ipc = Some(ipc.into());
1531 self
1532 }
1533
1534 pub fn with_config(mut self, config: RpcModuleConfig) -> Self {
1536 self.config = Some(config);
1537 self
1538 }
1539
1540 pub const fn http_mut(&mut self) -> &mut Option<RpcModuleSelection> {
1542 &mut self.http
1543 }
1544
1545 pub const fn ws_mut(&mut self) -> &mut Option<RpcModuleSelection> {
1547 &mut self.ws
1548 }
1549
1550 pub const fn ipc_mut(&mut self) -> &mut Option<RpcModuleSelection> {
1552 &mut self.ipc
1553 }
1554
1555 pub const fn config_mut(&mut self) -> &mut Option<RpcModuleConfig> {
1557 &mut self.config
1558 }
1559
1560 pub const fn is_empty(&self) -> bool {
1562 self.http.is_none() && self.ws.is_none() && self.ipc.is_none()
1563 }
1564
1565 pub const fn http(&self) -> Option<&RpcModuleSelection> {
1567 self.http.as_ref()
1568 }
1569
1570 pub const fn ws(&self) -> Option<&RpcModuleSelection> {
1572 self.ws.as_ref()
1573 }
1574
1575 pub const fn ipc(&self) -> Option<&RpcModuleSelection> {
1577 self.ipc.as_ref()
1578 }
1579
1580 pub const fn config(&self) -> Option<&RpcModuleConfig> {
1582 self.config.as_ref()
1583 }
1584
1585 pub fn contains_any(&self, module: &RethRpcModule) -> bool {
1587 self.contains_http(module) || self.contains_ws(module) || self.contains_ipc(module)
1588 }
1589
1590 pub fn contains_http(&self, module: &RethRpcModule) -> bool {
1592 self.http.as_ref().is_some_and(|http| http.contains(module))
1593 }
1594
1595 pub fn contains_ws(&self, module: &RethRpcModule) -> bool {
1597 self.ws.as_ref().is_some_and(|ws| ws.contains(module))
1598 }
1599
1600 pub fn contains_ipc(&self, module: &RethRpcModule) -> bool {
1602 self.ipc.as_ref().is_some_and(|ipc| ipc.contains(module))
1603 }
1604
1605 fn ensure_ws_http_identical(&self) -> Result<(), WsHttpSamePortError> {
1608 if RpcModuleSelection::are_identical(self.http.as_ref(), self.ws.as_ref()) {
1609 Ok(())
1610 } else {
1611 let http_modules =
1612 self.http.as_ref().map(RpcModuleSelection::to_selection).unwrap_or_default();
1613 let ws_modules =
1614 self.ws.as_ref().map(RpcModuleSelection::to_selection).unwrap_or_default();
1615
1616 let http_not_ws = http_modules.difference(&ws_modules).cloned().collect();
1617 let ws_not_http = ws_modules.difference(&http_modules).cloned().collect();
1618 let overlap = http_modules.intersection(&ws_modules).cloned().collect();
1619
1620 Err(WsHttpSamePortError::ConflictingModules(Box::new(ConflictingModules {
1621 overlap,
1622 http_not_ws,
1623 ws_not_http,
1624 })))
1625 }
1626 }
1627}
1628
1629#[derive(Debug, Clone, Default)]
1631pub struct TransportRpcModules<Context = ()> {
1632 config: TransportRpcModuleConfig,
1634 http: Option<RpcModule<Context>>,
1636 ws: Option<RpcModule<Context>>,
1638 ipc: Option<RpcModule<Context>>,
1640}
1641
1642impl TransportRpcModules {
1645 pub fn with_config(mut self, config: TransportRpcModuleConfig) -> Self {
1648 self.config = config;
1649 self
1650 }
1651
1652 pub fn with_http(mut self, http: RpcModule<()>) -> Self {
1655 self.http = Some(http);
1656 self
1657 }
1658
1659 pub fn with_ws(mut self, ws: RpcModule<()>) -> Self {
1662 self.ws = Some(ws);
1663 self
1664 }
1665
1666 pub fn with_ipc(mut self, ipc: RpcModule<()>) -> Self {
1669 self.ipc = Some(ipc);
1670 self
1671 }
1672
1673 pub const fn module_config(&self) -> &TransportRpcModuleConfig {
1675 &self.config
1676 }
1677
1678 pub fn merge_if_module_configured(
1683 &mut self,
1684 module: RethRpcModule,
1685 other: impl Into<Methods>,
1686 ) -> Result<(), RegisterMethodError> {
1687 let other = other.into();
1688 if self.module_config().contains_http(&module) {
1689 self.merge_http(other.clone())?;
1690 }
1691 if self.module_config().contains_ws(&module) {
1692 self.merge_ws(other.clone())?;
1693 }
1694 if self.module_config().contains_ipc(&module) {
1695 self.merge_ipc(other)?;
1696 }
1697
1698 Ok(())
1699 }
1700
1701 pub fn merge_if_module_configured_with<F>(
1708 &mut self,
1709 module: RethRpcModule,
1710 f: F,
1711 ) -> Result<(), RegisterMethodError>
1712 where
1713 F: FnOnce() -> Methods,
1714 {
1715 if !self.module_config().contains_any(&module) {
1717 return Ok(());
1718 }
1719 self.merge_if_module_configured(module, f())
1720 }
1721
1722 pub fn merge_http(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
1728 if let Some(ref mut http) = self.http {
1729 return http.merge(other.into()).map(|_| true)
1730 }
1731 Ok(false)
1732 }
1733
1734 pub fn merge_ws(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
1740 if let Some(ref mut ws) = self.ws {
1741 return ws.merge(other.into()).map(|_| true)
1742 }
1743 Ok(false)
1744 }
1745
1746 pub fn merge_ipc(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
1752 if let Some(ref mut ipc) = self.ipc {
1753 return ipc.merge(other.into()).map(|_| true)
1754 }
1755 Ok(false)
1756 }
1757
1758 pub fn merge_configured(
1762 &mut self,
1763 other: impl Into<Methods>,
1764 ) -> Result<(), RegisterMethodError> {
1765 let other = other.into();
1766 self.merge_http(other.clone())?;
1767 self.merge_ws(other.clone())?;
1768 self.merge_ipc(other)?;
1769 Ok(())
1770 }
1771
1772 pub fn methods_by_module(&self, module: RethRpcModule) -> Methods {
1776 self.methods_by(|name| name.starts_with(module.as_str()))
1777 }
1778
1779 pub fn methods_by<F>(&self, mut filter: F) -> Methods
1783 where
1784 F: FnMut(&str) -> bool,
1785 {
1786 let mut methods = Methods::new();
1787
1788 let mut f =
1790 |name: &str, mm: &Methods| filter(name) && !mm.method_names().any(|m| m == name);
1791
1792 if let Some(m) = self.http_methods(|name| f(name, &methods)) {
1793 let _ = methods.merge(m);
1794 }
1795 if let Some(m) = self.ws_methods(|name| f(name, &methods)) {
1796 let _ = methods.merge(m);
1797 }
1798 if let Some(m) = self.ipc_methods(|name| f(name, &methods)) {
1799 let _ = methods.merge(m);
1800 }
1801 methods
1802 }
1803
1804 pub fn http_methods<F>(&self, filter: F) -> Option<Methods>
1808 where
1809 F: FnMut(&str) -> bool,
1810 {
1811 self.http.as_ref().map(|module| methods_by(module, filter))
1812 }
1813
1814 pub fn ws_methods<F>(&self, filter: F) -> Option<Methods>
1818 where
1819 F: FnMut(&str) -> bool,
1820 {
1821 self.ws.as_ref().map(|module| methods_by(module, filter))
1822 }
1823
1824 pub fn ipc_methods<F>(&self, filter: F) -> Option<Methods>
1828 where
1829 F: FnMut(&str) -> bool,
1830 {
1831 self.ipc.as_ref().map(|module| methods_by(module, filter))
1832 }
1833
1834 pub fn remove_http_method(&mut self, method_name: &'static str) -> bool {
1842 if let Some(http_module) = &mut self.http {
1843 http_module.remove_method(method_name).is_some()
1844 } else {
1845 false
1846 }
1847 }
1848
1849 pub fn remove_http_methods(&mut self, methods: impl IntoIterator<Item = &'static str>) {
1851 for name in methods {
1852 self.remove_http_method(name);
1853 }
1854 }
1855
1856 pub fn remove_ws_method(&mut self, method_name: &'static str) -> bool {
1864 if let Some(ws_module) = &mut self.ws {
1865 ws_module.remove_method(method_name).is_some()
1866 } else {
1867 false
1868 }
1869 }
1870
1871 pub fn remove_ws_methods(&mut self, methods: impl IntoIterator<Item = &'static str>) {
1873 for name in methods {
1874 self.remove_ws_method(name);
1875 }
1876 }
1877
1878 pub fn remove_ipc_method(&mut self, method_name: &'static str) -> bool {
1886 if let Some(ipc_module) = &mut self.ipc {
1887 ipc_module.remove_method(method_name).is_some()
1888 } else {
1889 false
1890 }
1891 }
1892
1893 pub fn remove_ipc_methods(&mut self, methods: impl IntoIterator<Item = &'static str>) {
1895 for name in methods {
1896 self.remove_ipc_method(name);
1897 }
1898 }
1899
1900 pub fn remove_method_from_configured(&mut self, method_name: &'static str) -> bool {
1904 let http_removed = self.remove_http_method(method_name);
1905 let ws_removed = self.remove_ws_method(method_name);
1906 let ipc_removed = self.remove_ipc_method(method_name);
1907
1908 http_removed || ws_removed || ipc_removed
1909 }
1910
1911 pub fn rename(
1915 &mut self,
1916 old_name: &'static str,
1917 new_method: impl Into<Methods>,
1918 ) -> Result<(), RegisterMethodError> {
1919 self.remove_method_from_configured(old_name);
1921
1922 self.merge_configured(new_method)
1924 }
1925
1926 pub fn replace_http(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
1933 let other = other.into();
1934 self.remove_http_methods(other.method_names());
1935 self.merge_http(other)
1936 }
1937
1938 pub fn replace_ipc(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
1945 let other = other.into();
1946 self.remove_ipc_methods(other.method_names());
1947 self.merge_ipc(other)
1948 }
1949
1950 pub fn replace_ws(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
1957 let other = other.into();
1958 self.remove_ws_methods(other.method_names());
1959 self.merge_ws(other)
1960 }
1961
1962 pub fn replace_configured(
1966 &mut self,
1967 other: impl Into<Methods>,
1968 ) -> Result<bool, RegisterMethodError> {
1969 let other = other.into();
1970 self.replace_http(other.clone())?;
1971 self.replace_ws(other.clone())?;
1972 self.replace_ipc(other)?;
1973 Ok(true)
1974 }
1975
1976 pub fn add_or_replace_http(
1980 &mut self,
1981 other: impl Into<Methods>,
1982 ) -> Result<bool, RegisterMethodError> {
1983 let other = other.into();
1984 self.remove_http_methods(other.method_names());
1985 self.merge_http(other)
1986 }
1987
1988 pub fn add_or_replace_ws(
1992 &mut self,
1993 other: impl Into<Methods>,
1994 ) -> Result<bool, RegisterMethodError> {
1995 let other = other.into();
1996 self.remove_ws_methods(other.method_names());
1997 self.merge_ws(other)
1998 }
1999
2000 pub fn add_or_replace_ipc(
2004 &mut self,
2005 other: impl Into<Methods>,
2006 ) -> Result<bool, RegisterMethodError> {
2007 let other = other.into();
2008 self.remove_ipc_methods(other.method_names());
2009 self.merge_ipc(other)
2010 }
2011
2012 pub fn add_or_replace_configured(
2014 &mut self,
2015 other: impl Into<Methods>,
2016 ) -> Result<(), RegisterMethodError> {
2017 let other = other.into();
2018 self.add_or_replace_http(other.clone())?;
2019 self.add_or_replace_ws(other.clone())?;
2020 self.add_or_replace_ipc(other)?;
2021 Ok(())
2022 }
2023 pub fn add_or_replace_if_module_configured(
2026 &mut self,
2027 module: RethRpcModule,
2028 other: impl Into<Methods>,
2029 ) -> Result<(), RegisterMethodError> {
2030 let other = other.into();
2031 if self.module_config().contains_http(&module) {
2032 self.add_or_replace_http(other.clone())?;
2033 }
2034 if self.module_config().contains_ws(&module) {
2035 self.add_or_replace_ws(other.clone())?;
2036 }
2037 if self.module_config().contains_ipc(&module) {
2038 self.add_or_replace_ipc(other)?;
2039 }
2040 Ok(())
2041 }
2042}
2043
2044fn methods_by<T, F>(module: &RpcModule<T>, mut filter: F) -> Methods
2046where
2047 F: FnMut(&str) -> bool,
2048{
2049 let mut methods = Methods::new();
2050 let method_names = module.method_names().filter(|name| filter(name));
2051
2052 for name in method_names {
2053 if let Some(matched_method) = module.method(name).cloned() {
2054 let _ = methods.verify_and_insert(name, matched_method);
2055 }
2056 }
2057
2058 methods
2059}
2060
2061#[derive(Clone, Debug)]
2066#[must_use = "Server stops if dropped"]
2067pub struct RpcServerHandle {
2068 http_local_addr: Option<SocketAddr>,
2070 ws_local_addr: Option<SocketAddr>,
2071 http: Option<ServerHandle>,
2072 ws: Option<ServerHandle>,
2073 ipc_endpoint: Option<String>,
2074 ipc: Option<jsonrpsee::server::ServerHandle>,
2075 jwt_secret: Option<JwtSecret>,
2076}
2077
2078impl RpcServerHandle {
2081 fn bearer_token(&self) -> Option<String> {
2083 self.jwt_secret.as_ref().map(|secret| {
2084 format!(
2085 "Bearer {}",
2086 secret
2087 .encode(&Claims {
2088 iat: (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() +
2089 Duration::from_secs(60))
2090 .as_secs(),
2091 exp: None,
2092 })
2093 .unwrap()
2094 )
2095 })
2096 }
2097 pub const fn http_local_addr(&self) -> Option<SocketAddr> {
2099 self.http_local_addr
2100 }
2101
2102 pub const fn ws_local_addr(&self) -> Option<SocketAddr> {
2104 self.ws_local_addr
2105 }
2106
2107 pub fn stop(self) -> Result<(), AlreadyStoppedError> {
2109 if let Some(handle) = self.http {
2110 handle.stop()?
2111 }
2112
2113 if let Some(handle) = self.ws {
2114 handle.stop()?
2115 }
2116
2117 if let Some(handle) = self.ipc {
2118 handle.stop()?
2119 }
2120
2121 Ok(())
2122 }
2123
2124 pub fn ipc_endpoint(&self) -> Option<String> {
2126 self.ipc_endpoint.clone()
2127 }
2128
2129 pub fn http_url(&self) -> Option<String> {
2131 self.http_local_addr.map(|addr| format!("http://{addr}"))
2132 }
2133
2134 pub fn ws_url(&self) -> Option<String> {
2136 self.ws_local_addr.map(|addr| format!("ws://{addr}"))
2137 }
2138
2139 pub fn http_client(&self) -> Option<jsonrpsee::http_client::HttpClient> {
2141 let url = self.http_url()?;
2142
2143 let client = if let Some(token) = self.bearer_token() {
2144 jsonrpsee::http_client::HttpClientBuilder::default()
2145 .set_headers(HeaderMap::from_iter([(AUTHORIZATION, token.parse().unwrap())]))
2146 .build(url)
2147 } else {
2148 jsonrpsee::http_client::HttpClientBuilder::default().build(url)
2149 };
2150
2151 client.expect("failed to create http client").into()
2152 }
2153
2154 pub async fn ws_client(&self) -> Option<jsonrpsee::ws_client::WsClient> {
2156 let url = self.ws_url()?;
2157 let mut builder = jsonrpsee::ws_client::WsClientBuilder::default();
2158
2159 if let Some(token) = self.bearer_token() {
2160 let headers = HeaderMap::from_iter([(AUTHORIZATION, token.parse().unwrap())]);
2161 builder = builder.set_headers(headers);
2162 }
2163
2164 let client = builder.build(url).await.expect("failed to create ws client");
2165 Some(client)
2166 }
2167
2168 pub fn eth_http_provider(
2170 &self,
2171 ) -> Option<impl Provider<alloy_network::Ethereum> + Clone + Unpin + 'static> {
2172 self.new_http_provider_for()
2173 }
2174
2175 pub fn eth_http_provider_with_wallet<W>(
2178 &self,
2179 wallet: W,
2180 ) -> Option<impl Provider<alloy_network::Ethereum> + Clone + Unpin + 'static>
2181 where
2182 W: IntoWallet<alloy_network::Ethereum, NetworkWallet: Clone + Unpin + 'static>,
2183 {
2184 let rpc_url = self.http_url()?;
2185 let provider =
2186 ProviderBuilder::new().wallet(wallet).connect_http(rpc_url.parse().expect("valid url"));
2187 Some(provider)
2188 }
2189
2190 pub fn new_http_provider_for<N>(&self) -> Option<impl Provider<N> + Clone + Unpin + 'static>
2195 where
2196 N: RecommendedFillers<RecommendedFillers: Unpin>,
2197 {
2198 let rpc_url = self.http_url()?;
2199 let provider = ProviderBuilder::default()
2200 .with_recommended_fillers()
2201 .connect_http(rpc_url.parse().expect("valid url"));
2202 Some(provider)
2203 }
2204
2205 pub async fn eth_ws_provider(
2207 &self,
2208 ) -> Option<impl Provider<alloy_network::Ethereum> + Clone + Unpin + 'static> {
2209 self.new_ws_provider_for().await
2210 }
2211
2212 pub async fn eth_ws_provider_with_wallet<W>(
2215 &self,
2216 wallet: W,
2217 ) -> Option<impl Provider<alloy_network::Ethereum> + Clone + Unpin + 'static>
2218 where
2219 W: IntoWallet<alloy_network::Ethereum, NetworkWallet: Clone + Unpin + 'static>,
2220 {
2221 let rpc_url = self.ws_url()?;
2222 let provider = ProviderBuilder::new()
2223 .wallet(wallet)
2224 .connect(&rpc_url)
2225 .await
2226 .expect("failed to create ws client");
2227 Some(provider)
2228 }
2229
2230 pub async fn new_ws_provider_for<N>(&self) -> Option<impl Provider<N> + Clone + Unpin + 'static>
2235 where
2236 N: RecommendedFillers<RecommendedFillers: Unpin>,
2237 {
2238 let rpc_url = self.ws_url()?;
2239 let provider = ProviderBuilder::default()
2240 .with_recommended_fillers()
2241 .connect(&rpc_url)
2242 .await
2243 .expect("failed to create ws client");
2244 Some(provider)
2245 }
2246
2247 pub async fn eth_ipc_provider(
2249 &self,
2250 ) -> Option<impl Provider<alloy_network::Ethereum> + Clone + Unpin + 'static> {
2251 self.new_ipc_provider_for().await
2252 }
2253
2254 pub async fn new_ipc_provider_for<N>(
2259 &self,
2260 ) -> Option<impl Provider<N> + Clone + Unpin + 'static>
2261 where
2262 N: RecommendedFillers<RecommendedFillers: Unpin>,
2263 {
2264 let rpc_url = self.ipc_endpoint()?;
2265 let provider = ProviderBuilder::default()
2266 .with_recommended_fillers()
2267 .connect(&rpc_url)
2268 .await
2269 .expect("failed to create ipc client");
2270 Some(provider)
2271 }
2272}
2273
2274#[cfg(test)]
2275mod tests {
2276 use super::*;
2277
2278 #[test]
2279 fn parse_eth_call_bundle_selection() {
2280 let selection = "eth,admin,debug".parse::<RpcModuleSelection>().unwrap();
2281 assert_eq!(
2282 selection,
2283 RpcModuleSelection::Selection(
2284 [RethRpcModule::Eth, RethRpcModule::Admin, RethRpcModule::Debug,].into()
2285 )
2286 );
2287 }
2288
2289 #[test]
2290 fn parse_rpc_module_selection() {
2291 let selection = "all".parse::<RpcModuleSelection>().unwrap();
2292 assert_eq!(selection, RpcModuleSelection::All);
2293 }
2294
2295 #[test]
2296 fn parse_rpc_module_selection_none() {
2297 let selection = "none".parse::<RpcModuleSelection>().unwrap();
2298 assert_eq!(selection, RpcModuleSelection::Selection(Default::default()));
2299 }
2300
2301 #[test]
2302 fn parse_rpc_unique_module_selection() {
2303 let selection = "eth,admin,eth,net".parse::<RpcModuleSelection>().unwrap();
2304 assert_eq!(
2305 selection,
2306 RpcModuleSelection::Selection(
2307 [RethRpcModule::Eth, RethRpcModule::Admin, RethRpcModule::Net,].into()
2308 )
2309 );
2310 }
2311
2312 #[test]
2313 fn identical_selection() {
2314 assert!(RpcModuleSelection::are_identical(
2315 Some(&RpcModuleSelection::All),
2316 Some(&RpcModuleSelection::All),
2317 ));
2318 assert!(!RpcModuleSelection::are_identical(
2319 Some(&RpcModuleSelection::All),
2320 Some(&RpcModuleSelection::Standard),
2321 ));
2322 assert!(RpcModuleSelection::are_identical(
2323 Some(&RpcModuleSelection::Selection(RpcModuleSelection::Standard.to_selection())),
2324 Some(&RpcModuleSelection::Standard),
2325 ));
2326 assert!(RpcModuleSelection::are_identical(
2327 Some(&RpcModuleSelection::Selection([RethRpcModule::Eth].into())),
2328 Some(&RpcModuleSelection::Selection([RethRpcModule::Eth].into())),
2329 ));
2330 assert!(RpcModuleSelection::are_identical(
2331 None,
2332 Some(&RpcModuleSelection::Selection(Default::default())),
2333 ));
2334 assert!(RpcModuleSelection::are_identical(
2335 Some(&RpcModuleSelection::Selection(Default::default())),
2336 None,
2337 ));
2338 assert!(RpcModuleSelection::are_identical(None, None));
2339 }
2340
2341 #[test]
2342 fn test_rpc_module_str() {
2343 macro_rules! assert_rpc_module {
2344 ($($s:expr => $v:expr,)*) => {
2345 $(
2346 let val: RethRpcModule = $s.parse().unwrap();
2347 assert_eq!(val, $v);
2348 assert_eq!(val.to_string(), $s);
2349 )*
2350 };
2351 }
2352 assert_rpc_module!
2353 (
2354 "admin" => RethRpcModule::Admin,
2355 "debug" => RethRpcModule::Debug,
2356 "eth" => RethRpcModule::Eth,
2357 "net" => RethRpcModule::Net,
2358 "trace" => RethRpcModule::Trace,
2359 "web3" => RethRpcModule::Web3,
2360 "rpc" => RethRpcModule::Rpc,
2361 "ots" => RethRpcModule::Ots,
2362 "reth" => RethRpcModule::Reth,
2363 );
2364 }
2365
2366 #[test]
2367 fn test_default_selection() {
2368 let selection = RpcModuleSelection::Standard.to_selection();
2369 assert_eq!(selection, [RethRpcModule::Eth, RethRpcModule::Net, RethRpcModule::Web3].into())
2370 }
2371
2372 #[test]
2373 fn test_create_rpc_module_config() {
2374 let selection = vec!["eth", "admin"];
2375 let config = RpcModuleSelection::try_from_selection(selection).unwrap();
2376 assert_eq!(
2377 config,
2378 RpcModuleSelection::Selection([RethRpcModule::Eth, RethRpcModule::Admin].into())
2379 );
2380 }
2381
2382 #[test]
2383 fn test_configure_transport_config() {
2384 let config = TransportRpcModuleConfig::default()
2385 .with_http([RethRpcModule::Eth, RethRpcModule::Admin]);
2386 assert_eq!(
2387 config,
2388 TransportRpcModuleConfig {
2389 http: Some(RpcModuleSelection::Selection(
2390 [RethRpcModule::Eth, RethRpcModule::Admin].into()
2391 )),
2392 ws: None,
2393 ipc: None,
2394 config: None,
2395 }
2396 )
2397 }
2398
2399 #[test]
2400 fn test_configure_transport_config_none() {
2401 let config = TransportRpcModuleConfig::default().with_http(Vec::<RethRpcModule>::new());
2402 assert_eq!(
2403 config,
2404 TransportRpcModuleConfig {
2405 http: Some(RpcModuleSelection::Selection(Default::default())),
2406 ws: None,
2407 ipc: None,
2408 config: None,
2409 }
2410 )
2411 }
2412
2413 fn create_test_module() -> RpcModule<()> {
2414 let mut module = RpcModule::new(());
2415 module.register_method("anything", |_, _, _| "succeed").unwrap();
2416 module
2417 }
2418
2419 #[test]
2420 fn test_remove_http_method() {
2421 let mut modules =
2422 TransportRpcModules { http: Some(create_test_module()), ..Default::default() };
2423 assert!(modules.remove_http_method("anything"));
2425
2426 assert!(!modules.remove_http_method("non_existent_method"));
2428
2429 assert!(modules.http.as_ref().unwrap().method("anything").is_none());
2431 }
2432
2433 #[test]
2434 fn test_remove_ws_method() {
2435 let mut modules =
2436 TransportRpcModules { ws: Some(create_test_module()), ..Default::default() };
2437
2438 assert!(modules.remove_ws_method("anything"));
2440
2441 assert!(!modules.remove_ws_method("non_existent_method"));
2443
2444 assert!(modules.ws.as_ref().unwrap().method("anything").is_none());
2446 }
2447
2448 #[test]
2449 fn test_remove_ipc_method() {
2450 let mut modules =
2451 TransportRpcModules { ipc: Some(create_test_module()), ..Default::default() };
2452
2453 assert!(modules.remove_ipc_method("anything"));
2455
2456 assert!(!modules.remove_ipc_method("non_existent_method"));
2458
2459 assert!(modules.ipc.as_ref().unwrap().method("anything").is_none());
2461 }
2462
2463 #[test]
2464 fn test_remove_method_from_configured() {
2465 let mut modules = TransportRpcModules {
2466 http: Some(create_test_module()),
2467 ws: Some(create_test_module()),
2468 ipc: Some(create_test_module()),
2469 ..Default::default()
2470 };
2471
2472 assert!(modules.remove_method_from_configured("anything"));
2474
2475 assert!(!modules.remove_method_from_configured("anything"));
2477
2478 assert!(!modules.remove_method_from_configured("non_existent_method"));
2480
2481 assert!(modules.http.as_ref().unwrap().method("anything").is_none());
2483 assert!(modules.ws.as_ref().unwrap().method("anything").is_none());
2484 assert!(modules.ipc.as_ref().unwrap().method("anything").is_none());
2485 }
2486
2487 #[test]
2488 fn test_transport_rpc_module_rename() {
2489 let mut modules = TransportRpcModules {
2490 http: Some(create_test_module()),
2491 ws: Some(create_test_module()),
2492 ipc: Some(create_test_module()),
2493 ..Default::default()
2494 };
2495
2496 assert!(modules.http.as_ref().unwrap().method("anything").is_some());
2498 assert!(modules.ws.as_ref().unwrap().method("anything").is_some());
2499 assert!(modules.ipc.as_ref().unwrap().method("anything").is_some());
2500
2501 assert!(modules.http.as_ref().unwrap().method("something").is_none());
2503 assert!(modules.ws.as_ref().unwrap().method("something").is_none());
2504 assert!(modules.ipc.as_ref().unwrap().method("something").is_none());
2505
2506 let mut other_module = RpcModule::new(());
2508 other_module.register_method("something", |_, _, _| "fails").unwrap();
2509
2510 modules.rename("anything", other_module).expect("rename failed");
2512
2513 assert!(modules.http.as_ref().unwrap().method("anything").is_none());
2515 assert!(modules.ws.as_ref().unwrap().method("anything").is_none());
2516 assert!(modules.ipc.as_ref().unwrap().method("anything").is_none());
2517
2518 assert!(modules.http.as_ref().unwrap().method("something").is_some());
2520 assert!(modules.ws.as_ref().unwrap().method("something").is_some());
2521 assert!(modules.ipc.as_ref().unwrap().method("something").is_some());
2522 }
2523
2524 #[test]
2525 fn test_replace_http_method() {
2526 let mut modules =
2527 TransportRpcModules { http: Some(create_test_module()), ..Default::default() };
2528
2529 let mut other_module = RpcModule::new(());
2530 other_module.register_method("something", |_, _, _| "fails").unwrap();
2531
2532 assert!(modules.replace_http(other_module.clone()).unwrap());
2533
2534 assert!(modules.http.as_ref().unwrap().method("something").is_some());
2535
2536 other_module.register_method("anything", |_, _, _| "fails").unwrap();
2537 assert!(modules.replace_http(other_module.clone()).unwrap());
2538
2539 assert!(modules.http.as_ref().unwrap().method("anything").is_some());
2540 }
2541 #[test]
2542 fn test_replace_ipc_method() {
2543 let mut modules =
2544 TransportRpcModules { ipc: Some(create_test_module()), ..Default::default() };
2545
2546 let mut other_module = RpcModule::new(());
2547 other_module.register_method("something", |_, _, _| "fails").unwrap();
2548
2549 assert!(modules.replace_ipc(other_module.clone()).unwrap());
2550
2551 assert!(modules.ipc.as_ref().unwrap().method("something").is_some());
2552
2553 other_module.register_method("anything", |_, _, _| "fails").unwrap();
2554 assert!(modules.replace_ipc(other_module.clone()).unwrap());
2555
2556 assert!(modules.ipc.as_ref().unwrap().method("anything").is_some());
2557 }
2558 #[test]
2559 fn test_replace_ws_method() {
2560 let mut modules =
2561 TransportRpcModules { ws: Some(create_test_module()), ..Default::default() };
2562
2563 let mut other_module = RpcModule::new(());
2564 other_module.register_method("something", |_, _, _| "fails").unwrap();
2565
2566 assert!(modules.replace_ws(other_module.clone()).unwrap());
2567
2568 assert!(modules.ws.as_ref().unwrap().method("something").is_some());
2569
2570 other_module.register_method("anything", |_, _, _| "fails").unwrap();
2571 assert!(modules.replace_ws(other_module.clone()).unwrap());
2572
2573 assert!(modules.ws.as_ref().unwrap().method("anything").is_some());
2574 }
2575
2576 #[test]
2577 fn test_replace_configured() {
2578 let mut modules = TransportRpcModules {
2579 http: Some(create_test_module()),
2580 ws: Some(create_test_module()),
2581 ipc: Some(create_test_module()),
2582 ..Default::default()
2583 };
2584 let mut other_module = RpcModule::new(());
2585 other_module.register_method("something", |_, _, _| "fails").unwrap();
2586
2587 assert!(modules.replace_configured(other_module).unwrap());
2588
2589 assert!(modules.http.as_ref().unwrap().method("something").is_some());
2591 assert!(modules.ipc.as_ref().unwrap().method("something").is_some());
2592 assert!(modules.ws.as_ref().unwrap().method("something").is_some());
2593
2594 assert!(modules.http.as_ref().unwrap().method("anything").is_some());
2595 assert!(modules.ipc.as_ref().unwrap().method("anything").is_some());
2596 assert!(modules.ws.as_ref().unwrap().method("anything").is_some());
2597 }
2598
2599 #[test]
2600 fn test_add_or_replace_if_module_configured() {
2601 let config = TransportRpcModuleConfig::default()
2603 .with_http([RethRpcModule::Eth])
2604 .with_ws([RethRpcModule::Eth]);
2605
2606 let mut http_module = RpcModule::new(());
2608 http_module.register_method("eth_existing", |_, _, _| "original").unwrap();
2609
2610 let mut ws_module = RpcModule::new(());
2612 ws_module.register_method("eth_existing", |_, _, _| "original").unwrap();
2613
2614 let ipc_module = RpcModule::new(());
2616
2617 let mut modules = TransportRpcModules {
2619 config,
2620 http: Some(http_module),
2621 ws: Some(ws_module),
2622 ipc: Some(ipc_module),
2623 };
2624
2625 let mut new_module = RpcModule::new(());
2627 new_module.register_method("eth_existing", |_, _, _| "replaced").unwrap(); new_module.register_method("eth_new", |_, _, _| "added").unwrap(); let new_methods: Methods = new_module.into();
2630
2631 let result = modules.add_or_replace_if_module_configured(RethRpcModule::Eth, new_methods);
2633 assert!(result.is_ok(), "Function should succeed");
2634
2635 let http = modules.http.as_ref().unwrap();
2637 assert!(http.method("eth_existing").is_some());
2638 assert!(http.method("eth_new").is_some());
2639
2640 let ws = modules.ws.as_ref().unwrap();
2642 assert!(ws.method("eth_existing").is_some());
2643 assert!(ws.method("eth_new").is_some());
2644
2645 let ipc = modules.ipc.as_ref().unwrap();
2647 assert!(ipc.method("eth_existing").is_none());
2648 assert!(ipc.method("eth_new").is_none());
2649 }
2650
2651 #[test]
2652 fn test_merge_if_module_configured_with_lazy_evaluation() {
2653 let config = TransportRpcModuleConfig::default().with_http([RethRpcModule::Eth]);
2655
2656 let mut modules =
2657 TransportRpcModules { config, http: Some(RpcModule::new(())), ws: None, ipc: None };
2658
2659 let mut closure_called = false;
2661
2662 let result = modules.merge_if_module_configured_with(RethRpcModule::Eth, || {
2664 closure_called = true;
2665 let mut methods = RpcModule::new(());
2666 methods.register_method("eth_test", |_, _, _| "test").unwrap();
2667 methods.into()
2668 });
2669
2670 assert!(result.is_ok());
2671 assert!(closure_called, "Closure should be called when module is configured");
2672 assert!(modules.http.as_ref().unwrap().method("eth_test").is_some());
2673
2674 closure_called = false;
2676 let result = modules.merge_if_module_configured_with(RethRpcModule::Debug, || {
2677 closure_called = true;
2678 RpcModule::new(()).into()
2679 });
2680
2681 assert!(result.is_ok());
2682 assert!(!closure_called, "Closure should NOT be called when module is not configured");
2683 }
2684}