1pub use jsonrpsee::{
4 core::middleware::layer::Either,
5 server::middleware::rpc::{RpcService, RpcServiceBuilder},
6};
7use reth_engine_tree::tree::WaitForCaches;
8pub use reth_engine_tree::tree::{BasicEngineValidator, EngineValidator};
9pub use reth_rpc_builder::{
10 middleware::{RethAuthHttpMiddleware, RethRpcMiddleware},
11 Identity, Stack,
12};
13pub use reth_trie_db::ChangesetCache;
14
15use crate::{
16 invalid_block_hook::InvalidBlockHookExt, ConfigureEngineEvm, ConsensusEngineEvent,
17 ConsensusEngineHandle,
18};
19use alloy_rpc_types::engine::ClientVersionV1;
20use alloy_rpc_types_engine::ExecutionData;
21use jsonrpsee::RpcModule;
22use parking_lot::Mutex;
23use reth_chain_state::CanonStateSubscriptions;
24use reth_chainspec::{ChainSpecProvider, EthChainSpec, EthereumHardforks, Hardforks};
25use reth_node_api::{
26 AddOnsContext, BlockTy, EngineApiValidator, EngineTypes, FullNodeComponents, FullNodeTypes,
27 NodeAddOns, NodeTypes, PayloadTypes, PayloadValidator, PrimitivesTy, TreeConfig,
28};
29use reth_node_core::{
30 cli::config::RethTransactionPoolConfig,
31 node_config::NodeConfig,
32 version::{version_metadata, CLIENT_CODE},
33};
34use reth_payload_builder::{PayloadBuilderHandle, PayloadStore};
35use reth_rpc::{
36 eth::{core::EthRpcConverterFor, DevSigner, EthApiTypes, FullEthApiServer},
37 AdminApi,
38};
39use reth_rpc_api::{eth::helpers::EthTransactions, IntoEngineApiRpcModule};
40use reth_rpc_builder::{
41 auth::{AuthRpcModule, AuthServerHandle},
42 config::RethRpcServerConfig,
43 RpcModuleBuilder, RpcRegistryInner, RpcServerConfig, RpcServerHandle, TransportRpcModules,
44};
45use reth_rpc_engine_api::{capabilities::EngineCapabilities, EngineApi};
46use reth_rpc_eth_types::{cache::cache_new_blocks_task, EthConfig, EthStateCache};
47use reth_tokio_util::EventSender;
48use reth_tracing::tracing::{debug, info};
49use std::{
50 fmt::{self, Debug},
51 future::Future,
52 ops::{Deref, DerefMut},
53 sync::Arc,
54};
55use tokio::sync::oneshot;
56
57#[derive(Debug, Clone)]
61pub struct RethRpcServerHandles {
62 pub rpc: RpcServerHandle,
64 pub auth: AuthServerHandle,
66}
67
68pub struct RpcHooks<Node: FullNodeComponents, EthApi> {
70 pub on_rpc_started: Box<dyn OnRpcStarted<Node, EthApi>>,
72 pub extend_rpc_modules: Box<dyn ExtendRpcModules<Node, EthApi>>,
74}
75
76impl<Node, EthApi> Default for RpcHooks<Node, EthApi>
77where
78 Node: FullNodeComponents,
79 EthApi: EthApiTypes,
80{
81 fn default() -> Self {
82 Self { on_rpc_started: Box::<()>::default(), extend_rpc_modules: Box::<()>::default() }
83 }
84}
85
86impl<Node, EthApi> RpcHooks<Node, EthApi>
87where
88 Node: FullNodeComponents,
89 EthApi: EthApiTypes,
90{
91 pub(crate) fn set_on_rpc_started<F>(&mut self, hook: F) -> &mut Self
93 where
94 F: OnRpcStarted<Node, EthApi> + 'static,
95 {
96 self.on_rpc_started = Box::new(hook);
97 self
98 }
99
100 #[expect(unused)]
102 pub(crate) fn on_rpc_started<F>(mut self, hook: F) -> Self
103 where
104 F: OnRpcStarted<Node, EthApi> + 'static,
105 {
106 self.set_on_rpc_started(hook);
107 self
108 }
109
110 pub(crate) fn set_extend_rpc_modules<F>(&mut self, hook: F) -> &mut Self
112 where
113 F: ExtendRpcModules<Node, EthApi> + 'static,
114 {
115 self.extend_rpc_modules = Box::new(hook);
116 self
117 }
118
119 #[expect(unused)]
121 pub(crate) fn extend_rpc_modules<F>(mut self, hook: F) -> Self
122 where
123 F: ExtendRpcModules<Node, EthApi> + 'static,
124 {
125 self.set_extend_rpc_modules(hook);
126 self
127 }
128}
129
130impl<Node, EthApi> fmt::Debug for RpcHooks<Node, EthApi>
131where
132 Node: FullNodeComponents,
133 EthApi: EthApiTypes,
134{
135 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
136 f.debug_struct("RpcHooks")
137 .field("on_rpc_started", &"...")
138 .field("extend_rpc_modules", &"...")
139 .finish()
140 }
141}
142
143pub trait OnRpcStarted<Node: FullNodeComponents, EthApi: EthApiTypes>: Send {
145 fn on_rpc_started(
147 self: Box<Self>,
148 ctx: RpcContext<'_, Node, EthApi>,
149 handles: RethRpcServerHandles,
150 ) -> eyre::Result<()>;
151}
152
153impl<Node, EthApi, F> OnRpcStarted<Node, EthApi> for F
154where
155 F: FnOnce(RpcContext<'_, Node, EthApi>, RethRpcServerHandles) -> eyre::Result<()> + Send,
156 Node: FullNodeComponents,
157 EthApi: EthApiTypes,
158{
159 fn on_rpc_started(
160 self: Box<Self>,
161 ctx: RpcContext<'_, Node, EthApi>,
162 handles: RethRpcServerHandles,
163 ) -> eyre::Result<()> {
164 (*self)(ctx, handles)
165 }
166}
167
168impl<Node, EthApi> OnRpcStarted<Node, EthApi> for ()
169where
170 Node: FullNodeComponents,
171 EthApi: EthApiTypes,
172{
173 fn on_rpc_started(
174 self: Box<Self>,
175 _: RpcContext<'_, Node, EthApi>,
176 _: RethRpcServerHandles,
177 ) -> eyre::Result<()> {
178 Ok(())
179 }
180}
181
182pub trait ExtendRpcModules<Node: FullNodeComponents, EthApi: EthApiTypes>: Send {
184 fn extend_rpc_modules(self: Box<Self>, ctx: RpcContext<'_, Node, EthApi>) -> eyre::Result<()>;
186}
187
188impl<Node, EthApi, F> ExtendRpcModules<Node, EthApi> for F
189where
190 F: FnOnce(RpcContext<'_, Node, EthApi>) -> eyre::Result<()> + Send,
191 Node: FullNodeComponents,
192 EthApi: EthApiTypes,
193{
194 fn extend_rpc_modules(self: Box<Self>, ctx: RpcContext<'_, Node, EthApi>) -> eyre::Result<()> {
195 (*self)(ctx)
196 }
197}
198
199impl<Node, EthApi> ExtendRpcModules<Node, EthApi> for ()
200where
201 Node: FullNodeComponents,
202 EthApi: EthApiTypes,
203{
204 fn extend_rpc_modules(self: Box<Self>, _: RpcContext<'_, Node, EthApi>) -> eyre::Result<()> {
205 Ok(())
206 }
207}
208
209#[derive(Debug, Clone)]
211#[expect(clippy::type_complexity)]
212pub struct RpcRegistry<Node: FullNodeComponents, EthApi: EthApiTypes> {
213 pub(crate) registry: RpcRegistryInner<
214 Node::Provider,
215 Node::Pool,
216 Node::Network,
217 EthApi,
218 Node::Evm,
219 Node::Consensus,
220 >,
221}
222
223impl<Node, EthApi> Deref for RpcRegistry<Node, EthApi>
224where
225 Node: FullNodeComponents,
226 EthApi: EthApiTypes,
227{
228 type Target = RpcRegistryInner<
229 Node::Provider,
230 Node::Pool,
231 Node::Network,
232 EthApi,
233 Node::Evm,
234 Node::Consensus,
235 >;
236
237 fn deref(&self) -> &Self::Target {
238 &self.registry
239 }
240}
241
242impl<Node, EthApi> DerefMut for RpcRegistry<Node, EthApi>
243where
244 Node: FullNodeComponents,
245 EthApi: EthApiTypes,
246{
247 fn deref_mut(&mut self) -> &mut Self::Target {
248 &mut self.registry
249 }
250}
251
252#[expect(missing_debug_implementations)]
254pub struct RpcModuleContainer<'a, Node: FullNodeComponents, EthApi: EthApiTypes> {
255 pub modules: &'a mut TransportRpcModules,
257 pub auth_module: &'a mut AuthRpcModule,
259 pub registry: &'a mut RpcRegistry<Node, EthApi>,
261}
262
263#[expect(missing_debug_implementations)]
271pub struct RpcContext<'a, Node: FullNodeComponents, EthApi: EthApiTypes> {
272 pub(crate) node: Node,
274
275 pub(crate) config: &'a NodeConfig<<Node::Types as NodeTypes>::ChainSpec>,
277
278 pub registry: &'a mut RpcRegistry<Node, EthApi>,
282 pub modules: &'a mut TransportRpcModules,
287 pub auth_module: &'a mut AuthRpcModule,
291}
292
293impl<Node, EthApi> RpcContext<'_, Node, EthApi>
294where
295 Node: FullNodeComponents,
296 EthApi: EthApiTypes,
297{
298 pub const fn config(&self) -> &NodeConfig<<Node::Types as NodeTypes>::ChainSpec> {
300 self.config
301 }
302
303 pub const fn node(&self) -> &Node {
307 &self.node
308 }
309
310 pub fn pool(&self) -> &Node::Pool {
312 self.node.pool()
313 }
314
315 pub fn provider(&self) -> &Node::Provider {
317 self.node.provider()
318 }
319
320 pub fn network(&self) -> &Node::Network {
322 self.node.network()
323 }
324
325 pub fn payload_builder_handle(
327 &self,
328 ) -> &PayloadBuilderHandle<<Node::Types as NodeTypes>::Payload> {
329 self.node.payload_builder_handle()
330 }
331}
332
333pub struct RpcHandle<Node: FullNodeComponents, EthApi: EthApiTypes> {
335 pub rpc_server_handles: RethRpcServerHandles,
337 pub rpc_registry: RpcRegistry<Node, EthApi>,
339 pub engine_events: EventSender<ConsensusEngineEvent<<Node::Types as NodeTypes>::Primitives>>,
344 pub beacon_engine_handle: ConsensusEngineHandle<<Node::Types as NodeTypes>::Payload>,
346 pub engine_shutdown: EngineShutdown,
348}
349
350impl<Node: FullNodeComponents, EthApi: EthApiTypes> Clone for RpcHandle<Node, EthApi> {
351 fn clone(&self) -> Self {
352 Self {
353 rpc_server_handles: self.rpc_server_handles.clone(),
354 rpc_registry: self.rpc_registry.clone(),
355 engine_events: self.engine_events.clone(),
356 beacon_engine_handle: self.beacon_engine_handle.clone(),
357 engine_shutdown: self.engine_shutdown.clone(),
358 }
359 }
360}
361
362impl<Node: FullNodeComponents, EthApi: EthApiTypes> Deref for RpcHandle<Node, EthApi> {
363 type Target = RpcRegistry<Node, EthApi>;
364
365 fn deref(&self) -> &Self::Target {
366 &self.rpc_registry
367 }
368}
369
370impl<Node: FullNodeComponents, EthApi: EthApiTypes> Debug for RpcHandle<Node, EthApi>
371where
372 RpcRegistry<Node, EthApi>: Debug,
373{
374 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
375 f.debug_struct("RpcHandle")
376 .field("rpc_server_handles", &self.rpc_server_handles)
377 .field("rpc_registry", &self.rpc_registry)
378 .field("engine_shutdown", &self.engine_shutdown)
379 .finish()
380 }
381}
382
383impl<Node: FullNodeComponents, EthApi: EthApiTypes> RpcHandle<Node, EthApi> {
384 pub const fn rpc_server_handles(&self) -> &RethRpcServerHandles {
386 &self.rpc_server_handles
387 }
388
389 pub const fn consensus_engine_handle(
393 &self,
394 ) -> &ConsensusEngineHandle<<Node::Types as NodeTypes>::Payload> {
395 &self.beacon_engine_handle
396 }
397
398 pub const fn consensus_engine_events(
400 &self,
401 ) -> &EventSender<ConsensusEngineEvent<<Node::Types as NodeTypes>::Primitives>> {
402 &self.engine_events
403 }
404
405 pub const fn eth_api(&self) -> &EthApi {
407 self.rpc_registry.registry.eth_api()
408 }
409
410 pub fn admin_api(
412 &self,
413 ) -> AdminApi<Node::Network, <Node::Types as NodeTypes>::ChainSpec, Node::Pool>
414 where
415 <Node::Types as NodeTypes>::ChainSpec: EthereumHardforks,
416 {
417 self.rpc_registry.registry.admin_api()
418 }
419}
420
421#[derive(Debug, Clone)]
427pub struct RpcServerOnlyHandle<Node: FullNodeComponents, EthApi: EthApiTypes> {
428 pub rpc_server_handle: RpcServerHandle,
430 pub rpc_registry: RpcRegistry<Node, EthApi>,
432 pub engine_events: EventSender<ConsensusEngineEvent<<Node::Types as NodeTypes>::Primitives>>,
434 pub engine_handle: ConsensusEngineHandle<<Node::Types as NodeTypes>::Payload>,
436}
437
438impl<Node: FullNodeComponents, EthApi: EthApiTypes> RpcServerOnlyHandle<Node, EthApi> {
439 pub const fn rpc_server_handle(&self) -> &RpcServerHandle {
441 &self.rpc_server_handle
442 }
443
444 pub const fn consensus_engine_handle(
448 &self,
449 ) -> &ConsensusEngineHandle<<Node::Types as NodeTypes>::Payload> {
450 &self.engine_handle
451 }
452
453 pub const fn consensus_engine_events(
455 &self,
456 ) -> &EventSender<ConsensusEngineEvent<<Node::Types as NodeTypes>::Primitives>> {
457 &self.engine_events
458 }
459}
460
461#[derive(Debug, Clone)]
467pub struct AuthServerOnlyHandle<Node: FullNodeComponents, EthApi: EthApiTypes> {
468 pub auth_server_handle: AuthServerHandle,
470 pub rpc_registry: RpcRegistry<Node, EthApi>,
472 pub engine_events: EventSender<ConsensusEngineEvent<<Node::Types as NodeTypes>::Primitives>>,
474 pub engine_handle: ConsensusEngineHandle<<Node::Types as NodeTypes>::Payload>,
476}
477
478impl<Node: FullNodeComponents, EthApi: EthApiTypes> AuthServerOnlyHandle<Node, EthApi> {
479 pub const fn consensus_engine_handle(
483 &self,
484 ) -> &ConsensusEngineHandle<<Node::Types as NodeTypes>::Payload> {
485 &self.engine_handle
486 }
487
488 pub const fn consensus_engine_events(
490 &self,
491 ) -> &EventSender<ConsensusEngineEvent<<Node::Types as NodeTypes>::Primitives>> {
492 &self.engine_events
493 }
494}
495
496struct RpcSetupContext<'a, Node: FullNodeComponents, EthApi: EthApiTypes> {
498 node: Node,
499 config: &'a NodeConfig<<Node::Types as NodeTypes>::ChainSpec>,
500 modules: TransportRpcModules,
501 auth_module: AuthRpcModule,
502 auth_config: reth_rpc_builder::auth::AuthServerConfig,
503 registry: RpcRegistry<Node, EthApi>,
504 on_rpc_started: Box<dyn OnRpcStarted<Node, EthApi>>,
505 engine_events: EventSender<ConsensusEngineEvent<<Node::Types as NodeTypes>::Primitives>>,
506 engine_handle: ConsensusEngineHandle<<Node::Types as NodeTypes>::Payload>,
507}
508
509pub struct RpcAddOns<
520 Node: FullNodeComponents,
521 EthB: EthApiBuilder<Node>,
522 PVB,
523 EB = BasicEngineApiBuilder<PVB>,
524 EVB = BasicEngineValidatorBuilder<PVB>,
525 RpcMiddleware = Identity,
526 AuthHttpMiddleware = Identity,
527> {
528 pub hooks: RpcHooks<Node, EthB::EthApi>,
530 eth_api_builder: EthB,
532 payload_validator_builder: PVB,
534 engine_api_builder: EB,
536 engine_validator_builder: EVB,
538 rpc_middleware: RpcMiddleware,
543 auth_http_middleware: AuthHttpMiddleware,
548 tokio_runtime: Option<tokio::runtime::Handle>,
550}
551
552impl<Node, EthB, PVB, EB, EVB, RpcMiddleware, AuthHttpMiddleware> Debug
553 for RpcAddOns<Node, EthB, PVB, EB, EVB, RpcMiddleware, AuthHttpMiddleware>
554where
555 Node: FullNodeComponents,
556 EthB: EthApiBuilder<Node>,
557 PVB: Debug,
558 EB: Debug,
559 EVB: Debug,
560{
561 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
562 f.debug_struct("RpcAddOns")
563 .field("hooks", &self.hooks)
564 .field("eth_api_builder", &"...")
565 .field("payload_validator_builder", &self.payload_validator_builder)
566 .field("engine_api_builder", &self.engine_api_builder)
567 .field("engine_validator_builder", &self.engine_validator_builder)
568 .field("rpc_middleware", &"...")
569 .finish()
570 }
571}
572
573impl<Node, EthB, PVB, EB, EVB, RpcMiddleware, AuthHttpMiddleware>
574 RpcAddOns<Node, EthB, PVB, EB, EVB, RpcMiddleware, AuthHttpMiddleware>
575where
576 Node: FullNodeComponents,
577 EthB: EthApiBuilder<Node>,
578{
579 pub fn new(
581 eth_api_builder: EthB,
582 payload_validator_builder: PVB,
583 engine_api_builder: EB,
584 engine_validator_builder: EVB,
585 rpc_middleware: RpcMiddleware,
586 auth_http_middleware: AuthHttpMiddleware,
587 ) -> Self {
588 Self {
589 hooks: RpcHooks::default(),
590 eth_api_builder,
591 payload_validator_builder,
592 engine_api_builder,
593 engine_validator_builder,
594 rpc_middleware,
595 auth_http_middleware,
596 tokio_runtime: None,
597 }
598 }
599
600 pub fn with_engine_api<T>(
602 self,
603 engine_api_builder: T,
604 ) -> RpcAddOns<Node, EthB, PVB, T, EVB, RpcMiddleware, AuthHttpMiddleware> {
605 let Self {
606 hooks,
607 eth_api_builder,
608 payload_validator_builder,
609 engine_validator_builder,
610 rpc_middleware,
611 auth_http_middleware,
612 tokio_runtime,
613 ..
614 } = self;
615 RpcAddOns {
616 hooks,
617 eth_api_builder,
618 payload_validator_builder,
619 engine_api_builder,
620 engine_validator_builder,
621 rpc_middleware,
622 auth_http_middleware,
623 tokio_runtime,
624 }
625 }
626
627 pub fn with_payload_validator<T>(
629 self,
630 payload_validator_builder: T,
631 ) -> RpcAddOns<Node, EthB, T, EB, EVB, RpcMiddleware, AuthHttpMiddleware> {
632 let Self {
633 hooks,
634 eth_api_builder,
635 engine_api_builder,
636 engine_validator_builder,
637 rpc_middleware,
638 auth_http_middleware,
639 tokio_runtime,
640 ..
641 } = self;
642 RpcAddOns {
643 hooks,
644 eth_api_builder,
645 payload_validator_builder,
646 engine_api_builder,
647 engine_validator_builder,
648 rpc_middleware,
649 auth_http_middleware,
650 tokio_runtime,
651 }
652 }
653
654 pub fn with_engine_validator<T>(
656 self,
657 engine_validator_builder: T,
658 ) -> RpcAddOns<Node, EthB, PVB, EB, T, RpcMiddleware, AuthHttpMiddleware> {
659 let Self {
660 hooks,
661 eth_api_builder,
662 payload_validator_builder,
663 engine_api_builder,
664 rpc_middleware,
665 auth_http_middleware,
666 tokio_runtime,
667 ..
668 } = self;
669 RpcAddOns {
670 hooks,
671 eth_api_builder,
672 payload_validator_builder,
673 engine_api_builder,
674 engine_validator_builder,
675 rpc_middleware,
676 auth_http_middleware,
677 tokio_runtime,
678 }
679 }
680
681 pub fn with_rpc_middleware<T>(
720 self,
721 rpc_middleware: T,
722 ) -> RpcAddOns<Node, EthB, PVB, EB, EVB, T, AuthHttpMiddleware> {
723 let Self {
724 hooks,
725 eth_api_builder,
726 payload_validator_builder,
727 engine_api_builder,
728 engine_validator_builder,
729 auth_http_middleware,
730 tokio_runtime,
731 ..
732 } = self;
733 RpcAddOns {
734 hooks,
735 eth_api_builder,
736 payload_validator_builder,
737 engine_api_builder,
738 engine_validator_builder,
739 rpc_middleware,
740 auth_http_middleware,
741 tokio_runtime,
742 }
743 }
744
745 pub fn with_auth_http_middleware<T>(
750 self,
751 auth_http_middleware: T,
752 ) -> RpcAddOns<Node, EthB, PVB, EB, EVB, RpcMiddleware, T> {
753 let Self {
754 hooks,
755 eth_api_builder,
756 payload_validator_builder,
757 engine_api_builder,
758 engine_validator_builder,
759 rpc_middleware,
760 tokio_runtime,
761 ..
762 } = self;
763 RpcAddOns {
764 hooks,
765 eth_api_builder,
766 payload_validator_builder,
767 engine_api_builder,
768 engine_validator_builder,
769 rpc_middleware,
770 auth_http_middleware,
771 tokio_runtime,
772 }
773 }
774
775 pub fn layer_auth_http_middleware<T>(
777 self,
778 layer: T,
779 ) -> RpcAddOns<Node, EthB, PVB, EB, EVB, RpcMiddleware, Stack<AuthHttpMiddleware, T>> {
780 let Self {
781 hooks,
782 eth_api_builder,
783 payload_validator_builder,
784 engine_api_builder,
785 engine_validator_builder,
786 rpc_middleware,
787 auth_http_middleware,
788 tokio_runtime,
789 } = self;
790 let auth_http_middleware = Stack::new(auth_http_middleware, layer);
791 RpcAddOns {
792 hooks,
793 eth_api_builder,
794 payload_validator_builder,
795 engine_api_builder,
796 engine_validator_builder,
797 rpc_middleware,
798 auth_http_middleware,
799 tokio_runtime,
800 }
801 }
802
803 #[expect(clippy::type_complexity)]
805 pub fn option_layer_auth_http_middleware<T>(
806 self,
807 layer: Option<T>,
808 ) -> RpcAddOns<
809 Node,
810 EthB,
811 PVB,
812 EB,
813 EVB,
814 RpcMiddleware,
815 Stack<AuthHttpMiddleware, Either<T, Identity>>,
816 > {
817 let layer = layer.map(Either::Left).unwrap_or(Either::Right(Identity::new()));
818 self.layer_auth_http_middleware(layer)
819 }
820
821 pub fn with_tokio_runtime(self, tokio_runtime: Option<tokio::runtime::Handle>) -> Self {
825 let Self {
826 hooks,
827 eth_api_builder,
828 payload_validator_builder,
829 engine_validator_builder,
830 engine_api_builder,
831 rpc_middleware,
832 auth_http_middleware,
833 ..
834 } = self;
835 Self {
836 hooks,
837 eth_api_builder,
838 payload_validator_builder,
839 engine_validator_builder,
840 engine_api_builder,
841 rpc_middleware,
842 auth_http_middleware,
843 tokio_runtime,
844 }
845 }
846
847 pub fn layer_rpc_middleware<T>(
849 self,
850 layer: T,
851 ) -> RpcAddOns<Node, EthB, PVB, EB, EVB, Stack<RpcMiddleware, T>, AuthHttpMiddleware> {
852 let Self {
853 hooks,
854 eth_api_builder,
855 payload_validator_builder,
856 engine_api_builder,
857 engine_validator_builder,
858 rpc_middleware,
859 auth_http_middleware,
860 tokio_runtime,
861 } = self;
862 let rpc_middleware = Stack::new(rpc_middleware, layer);
863 RpcAddOns {
864 hooks,
865 eth_api_builder,
866 payload_validator_builder,
867 engine_api_builder,
868 engine_validator_builder,
869 rpc_middleware,
870 auth_http_middleware,
871 tokio_runtime,
872 }
873 }
874
875 #[expect(clippy::type_complexity)]
877 pub fn option_layer_rpc_middleware<T>(
878 self,
879 layer: Option<T>,
880 ) -> RpcAddOns<
881 Node,
882 EthB,
883 PVB,
884 EB,
885 EVB,
886 Stack<RpcMiddleware, Either<T, Identity>>,
887 AuthHttpMiddleware,
888 > {
889 let layer = layer.map(Either::Left).unwrap_or(Either::Right(Identity::new()));
890 self.layer_rpc_middleware(layer)
891 }
892
893 pub fn on_rpc_started<F>(mut self, hook: F) -> Self
895 where
896 F: FnOnce(RpcContext<'_, Node, EthB::EthApi>, RethRpcServerHandles) -> eyre::Result<()>
897 + Send
898 + 'static,
899 {
900 self.hooks.set_on_rpc_started(hook);
901 self
902 }
903
904 pub fn extend_rpc_modules<F>(mut self, hook: F) -> Self
906 where
907 F: FnOnce(RpcContext<'_, Node, EthB::EthApi>) -> eyre::Result<()> + Send + 'static,
908 {
909 self.hooks.set_extend_rpc_modules(hook);
910 self
911 }
912}
913
914impl<Node, EthB, EV, EB, Engine> Default
915 for RpcAddOns<Node, EthB, EV, EB, Engine, Identity, Identity>
916where
917 Node: FullNodeComponents,
918 EthB: EthApiBuilder<Node>,
919 EV: Default,
920 EB: Default,
921 Engine: Default,
922{
923 fn default() -> Self {
924 Self::new(
925 EthB::default(),
926 EV::default(),
927 EB::default(),
928 Engine::default(),
929 Default::default(),
930 Identity::new(),
931 )
932 }
933}
934
935impl<N, EthB, PVB, EB, EVB, RpcMiddleware, AuthHttpMiddleware>
936 RpcAddOns<N, EthB, PVB, EB, EVB, RpcMiddleware, AuthHttpMiddleware>
937where
938 N: FullNodeComponents,
939 N::Provider: ChainSpecProvider<ChainSpec: EthereumHardforks>,
940 EthB: EthApiBuilder<N>,
941 EB: EngineApiBuilder<N>,
942 EVB: EngineValidatorBuilder<N>,
943 RpcMiddleware: RethRpcMiddleware,
944 AuthHttpMiddleware: RethAuthHttpMiddleware<Identity>,
945{
946 pub async fn launch_rpc_server<F>(
952 self,
953 ctx: AddOnsContext<'_, N>,
954 ext: F,
955 ) -> eyre::Result<RpcServerOnlyHandle<N, EthB::EthApi>>
956 where
957 F: FnOnce(RpcModuleContainer<'_, N, EthB::EthApi>) -> eyre::Result<()>,
958 {
959 let rpc_middleware = self.rpc_middleware.clone();
960 let tokio_runtime = self.tokio_runtime.clone();
961 let setup_ctx = self.setup_rpc_components(ctx, ext).await?;
962 let RpcSetupContext {
963 node,
964 config,
965 mut modules,
966 mut auth_module,
967 auth_config: _,
968 mut registry,
969 on_rpc_started,
970 engine_events,
971 engine_handle,
972 } = setup_ctx;
973
974 let server_config = config
975 .rpc
976 .rpc_server_config()
977 .set_rpc_middleware(rpc_middleware)
978 .with_tokio_runtime(tokio_runtime);
979 let rpc_server_handle = Self::launch_rpc_server_internal(server_config, &modules).await?;
980
981 let handles =
982 RethRpcServerHandles { rpc: rpc_server_handle.clone(), auth: AuthServerHandle::noop() };
983 Self::finalize_rpc_setup(
984 &mut registry,
985 &mut modules,
986 &mut auth_module,
987 &node,
988 config,
989 on_rpc_started,
990 handles,
991 )?;
992
993 Ok(RpcServerOnlyHandle {
994 rpc_server_handle,
995 rpc_registry: registry,
996 engine_events,
997 engine_handle,
998 })
999 }
1000
1001 pub async fn launch_add_ons_with<F>(
1004 self,
1005 ctx: AddOnsContext<'_, N>,
1006 ext: F,
1007 ) -> eyre::Result<RpcHandle<N, EthB::EthApi>>
1008 where
1009 F: FnOnce(RpcModuleContainer<'_, N, EthB::EthApi>) -> eyre::Result<()>,
1010 {
1011 let disable_auth = ctx.config.rpc.disable_auth_server;
1013 self.launch_add_ons_with_opt_engine(ctx, ext, disable_auth).await
1014 }
1015
1016 pub async fn launch_add_ons_with_opt_engine<F>(
1022 self,
1023 ctx: AddOnsContext<'_, N>,
1024 ext: F,
1025 disable_auth: bool,
1026 ) -> eyre::Result<RpcHandle<N, EthB::EthApi>>
1027 where
1028 F: FnOnce(RpcModuleContainer<'_, N, EthB::EthApi>) -> eyre::Result<()>,
1029 {
1030 let rpc_middleware = self.rpc_middleware.clone();
1031 let auth_http_middleware = self.auth_http_middleware.clone();
1032 let tokio_runtime = self.tokio_runtime.clone();
1033 let setup_ctx = self.setup_rpc_components(ctx, ext).await?;
1034 let RpcSetupContext {
1035 node,
1036 config,
1037 mut modules,
1038 mut auth_module,
1039 auth_config,
1040 mut registry,
1041 on_rpc_started,
1042 engine_events,
1043 engine_handle,
1044 } = setup_ctx;
1045
1046 let server_config = config
1047 .rpc
1048 .rpc_server_config()
1049 .set_rpc_middleware(rpc_middleware)
1050 .with_tokio_runtime(tokio_runtime);
1051
1052 let auth_config = auth_config.with_http_middleware(auth_http_middleware);
1053
1054 let (rpc, auth) = if disable_auth {
1055 let rpc = Self::launch_rpc_server_internal(server_config, &modules).await?;
1057 (rpc, AuthServerHandle::noop())
1058 } else {
1059 let auth_module_clone = auth_module.clone();
1060 let (rpc, auth) = futures::future::try_join(
1062 Self::launch_rpc_server_internal(server_config, &modules),
1063 Self::launch_auth_server_internal(auth_config.start(auth_module_clone)),
1064 )
1065 .await?;
1066 (rpc, auth)
1067 };
1068
1069 let handles = RethRpcServerHandles { rpc, auth };
1070
1071 Self::finalize_rpc_setup(
1072 &mut registry,
1073 &mut modules,
1074 &mut auth_module,
1075 &node,
1076 config,
1077 on_rpc_started,
1078 handles.clone(),
1079 )?;
1080
1081 Ok(RpcHandle {
1082 rpc_server_handles: handles,
1083 rpc_registry: registry,
1084 engine_events,
1085 beacon_engine_handle: engine_handle,
1086 engine_shutdown: EngineShutdown::default(),
1087 })
1088 }
1089
1090 async fn setup_rpc_components<'a, F>(
1092 self,
1093 ctx: AddOnsContext<'a, N>,
1094 ext: F,
1095 ) -> eyre::Result<RpcSetupContext<'a, N, EthB::EthApi>>
1096 where
1097 F: FnOnce(RpcModuleContainer<'_, N, EthB::EthApi>) -> eyre::Result<()>,
1098 {
1099 let Self { eth_api_builder, engine_api_builder, hooks, .. } = self;
1100
1101 let engine_api = engine_api_builder.build_engine_api(&ctx).await?;
1102 let AddOnsContext { node, config, beacon_engine_handle, jwt_secret, engine_events } = ctx;
1103
1104 info!(target: "reth::cli", "Engine API handler initialized");
1105
1106 let cache = EthStateCache::spawn_with(
1107 node.provider().clone(),
1108 config.rpc.eth_config().cache,
1109 node.task_executor().clone(),
1110 );
1111
1112 let new_canonical_blocks = node.provider().canonical_state_stream();
1113 let c = cache.clone();
1114 node.task_executor().spawn_critical_task("cache canonical blocks task", async move {
1115 cache_new_blocks_task(c, new_canonical_blocks).await;
1116 });
1117
1118 let eth_config = config.rpc.eth_config().max_batch_size(config.txpool.max_batch_size());
1119 let ctx = EthApiCtx {
1120 components: &node,
1121 config: eth_config,
1122 cache,
1123 engine_handle: beacon_engine_handle.clone(),
1124 };
1125 let eth_api = eth_api_builder.build_eth_api(ctx).await?;
1126
1127 let auth_config = config.rpc.auth_server_config(jwt_secret)?;
1128 let module_config = config.rpc.transport_rpc_module_config();
1129 debug!(target: "reth::cli", http=?module_config.http(), ws=?module_config.ws(), "Using RPC module config");
1130
1131 let (mut modules, mut auth_module, registry) = RpcModuleBuilder::default()
1132 .with_provider(node.provider().clone())
1133 .with_pool(node.pool().clone())
1134 .with_network(node.network().clone())
1135 .with_executor(node.task_executor().clone())
1136 .with_evm_config(node.evm_config().clone())
1137 .with_consensus(node.consensus().clone())
1138 .build_with_auth_server(
1139 module_config,
1140 engine_api,
1141 eth_api,
1142 engine_events.clone(),
1143 beacon_engine_handle.clone(),
1144 );
1145
1146 if config.dev.dev {
1148 let signers = DevSigner::from_mnemonic(config.dev.dev_mnemonic.as_str(), 20);
1149 registry.eth_api().signers().write().extend(signers);
1150 }
1151
1152 let mut registry = RpcRegistry { registry };
1153 let ctx = RpcContext {
1154 node: node.clone(),
1155 config,
1156 registry: &mut registry,
1157 modules: &mut modules,
1158 auth_module: &mut auth_module,
1159 };
1160
1161 let RpcHooks { on_rpc_started, extend_rpc_modules } = hooks;
1162
1163 ext(RpcModuleContainer {
1164 modules: ctx.modules,
1165 auth_module: ctx.auth_module,
1166 registry: ctx.registry,
1167 })?;
1168 extend_rpc_modules.extend_rpc_modules(ctx)?;
1169
1170 Ok(RpcSetupContext {
1171 node,
1172 config,
1173 modules,
1174 auth_module,
1175 auth_config,
1176 registry,
1177 on_rpc_started,
1178 engine_events,
1179 engine_handle: beacon_engine_handle,
1180 })
1181 }
1182
1183 async fn launch_rpc_server_internal<M>(
1185 server_config: RpcServerConfig<M>,
1186 modules: &TransportRpcModules,
1187 ) -> eyre::Result<RpcServerHandle>
1188 where
1189 M: RethRpcMiddleware,
1190 {
1191 let handle = server_config.start(modules).await?;
1192
1193 if let Some(path) = handle.ipc_endpoint() {
1194 info!(target: "reth::cli", %path, "RPC IPC server started");
1195 }
1196 if let Some(addr) = handle.http_local_addr() {
1197 info!(target: "reth::cli", url=%addr, "RPC HTTP server started");
1198 }
1199 if let Some(addr) = handle.ws_local_addr() {
1200 info!(target: "reth::cli", url=%addr, "RPC WS server started");
1201 }
1202
1203 Ok(handle)
1204 }
1205
1206 async fn launch_auth_server_internal(
1208 start_fut: impl Future<Output = Result<AuthServerHandle, reth_rpc_builder::error::RpcError>>,
1209 ) -> eyre::Result<AuthServerHandle> {
1210 start_fut
1211 .await
1212 .map_err(Into::into)
1213 .inspect(|handle| {
1214 let addr = handle.local_addr();
1215 if let Some(ipc_endpoint) = handle.ipc_endpoint() {
1216 info!(target: "reth::cli", url=%addr, ipc_endpoint=%ipc_endpoint, "RPC auth server started");
1217 } else {
1218 info!(target: "reth::cli", url=%addr, "RPC auth server started");
1219 }
1220 })
1221 }
1222
1223 fn finalize_rpc_setup(
1225 registry: &mut RpcRegistry<N, EthB::EthApi>,
1226 modules: &mut TransportRpcModules,
1227 auth_module: &mut AuthRpcModule,
1228 node: &N,
1229 config: &NodeConfig<<N::Types as NodeTypes>::ChainSpec>,
1230 on_rpc_started: Box<dyn OnRpcStarted<N, EthB::EthApi>>,
1231 handles: RethRpcServerHandles,
1232 ) -> eyre::Result<()> {
1233 let ctx = RpcContext { node: node.clone(), config, registry, modules, auth_module };
1234
1235 on_rpc_started.on_rpc_started(ctx, handles)?;
1236 Ok(())
1237 }
1238}
1239
1240impl<N, EthB, PVB, EB, EVB, RpcMiddleware, AuthHttpMiddleware> NodeAddOns<N>
1241 for RpcAddOns<N, EthB, PVB, EB, EVB, RpcMiddleware, AuthHttpMiddleware>
1242where
1243 N: FullNodeComponents,
1244 <N as FullNodeTypes>::Provider: ChainSpecProvider<ChainSpec: EthereumHardforks>,
1245 EthB: EthApiBuilder<N>,
1246 PVB: PayloadValidatorBuilder<N>,
1247 EB: EngineApiBuilder<N>,
1248 EVB: EngineValidatorBuilder<N>,
1249 RpcMiddleware: RethRpcMiddleware,
1250 AuthHttpMiddleware: RethAuthHttpMiddleware<Identity>,
1251{
1252 type Handle = RpcHandle<N, EthB::EthApi>;
1253
1254 async fn launch_add_ons(self, ctx: AddOnsContext<'_, N>) -> eyre::Result<Self::Handle> {
1255 self.launch_add_ons_with(ctx, |_| Ok(())).await
1256 }
1257}
1258
1259pub trait RethRpcAddOns<N: FullNodeComponents>:
1262 NodeAddOns<N, Handle = RpcHandle<N, Self::EthApi>>
1263{
1264 type EthApi: EthApiTypes;
1266
1267 fn hooks_mut(&mut self) -> &mut RpcHooks<N, Self::EthApi>;
1269}
1270
1271impl<N: FullNodeComponents, EthB, EV, EB, Engine, RpcMiddleware, AuthHttpMiddleware>
1272 RethRpcAddOns<N> for RpcAddOns<N, EthB, EV, EB, Engine, RpcMiddleware, AuthHttpMiddleware>
1273where
1274 Self: NodeAddOns<N, Handle = RpcHandle<N, EthB::EthApi>>,
1275 EthB: EthApiBuilder<N>,
1276{
1277 type EthApi = EthB::EthApi;
1278
1279 fn hooks_mut(&mut self) -> &mut RpcHooks<N, Self::EthApi> {
1280 &mut self.hooks
1281 }
1282}
1283
1284#[derive(Debug)]
1287pub struct EthApiCtx<'a, N: FullNodeTypes> {
1288 pub components: &'a N,
1290 pub config: EthConfig,
1292 pub cache: EthStateCache<PrimitivesTy<N::Types>>,
1294 pub engine_handle: ConsensusEngineHandle<<N::Types as NodeTypes>::Payload>,
1296}
1297
1298impl<'a, N: FullNodeComponents<Types: NodeTypes<ChainSpec: Hardforks + EthereumHardforks>>>
1299 EthApiCtx<'a, N>
1300{
1301 pub fn eth_api_builder(self) -> reth_rpc::EthApiBuilder<N, EthRpcConverterFor<N>> {
1303 reth_rpc::EthApiBuilder::new_with_components(self.components.clone())
1304 .eth_cache(self.cache)
1305 .task_spawner(self.components.task_executor().clone())
1306 .gas_cap(self.config.rpc_gas_cap.into())
1307 .max_simulate_blocks(self.config.rpc_max_simulate_blocks)
1308 .eth_proof_window(self.config.eth_proof_window)
1309 .fee_history_cache_config(self.config.fee_history_cache)
1310 .proof_permits(self.config.proof_permits)
1311 .gas_oracle_config(self.config.gas_oracle)
1312 .max_batch_size(self.config.max_batch_size)
1313 .max_blocking_io_requests(self.config.max_blocking_io_requests)
1314 .pending_block_kind(self.config.pending_block_kind)
1315 .raw_tx_forwarder(self.config.raw_tx_forwarder)
1316 .evm_memory_limit(self.config.rpc_evm_memory_limit)
1317 .force_blob_sidecar_upcasting(self.config.force_blob_sidecar_upcasting)
1318 }
1319}
1320
1321pub trait EthApiBuilder<N: FullNodeComponents>: Default + Send + 'static {
1323 type EthApi: FullEthApiServer<Provider = N::Provider, Pool = N::Pool>;
1325
1326 fn build_eth_api(
1328 self,
1329 ctx: EthApiCtx<'_, N>,
1330 ) -> impl Future<Output = eyre::Result<Self::EthApi>> + Send;
1331}
1332
1333pub trait EngineValidatorAddOn<Node: FullNodeComponents>: Send {
1335 type ValidatorBuilder: EngineValidatorBuilder<Node>;
1337
1338 fn engine_validator_builder(&self) -> Self::ValidatorBuilder;
1340}
1341
1342impl<N, EthB, PVB, EB, EVB, RpcMiddleware, AuthHttpMiddleware> EngineValidatorAddOn<N>
1343 for RpcAddOns<N, EthB, PVB, EB, EVB, RpcMiddleware, AuthHttpMiddleware>
1344where
1345 N: FullNodeComponents,
1346 EthB: EthApiBuilder<N>,
1347 PVB: Send,
1348 EB: EngineApiBuilder<N>,
1349 EVB: EngineValidatorBuilder<N>,
1350 RpcMiddleware: Send,
1351 AuthHttpMiddleware: Send,
1352{
1353 type ValidatorBuilder = EVB;
1354
1355 fn engine_validator_builder(&self) -> Self::ValidatorBuilder {
1356 self.engine_validator_builder.clone()
1357 }
1358}
1359
1360pub trait EngineApiBuilder<Node: FullNodeComponents>: Send + Sync {
1367 type EngineApi: IntoEngineApiRpcModule + Send + Sync;
1369
1370 fn build_engine_api(
1375 self,
1376 ctx: &AddOnsContext<'_, Node>,
1377 ) -> impl Future<Output = eyre::Result<Self::EngineApi>> + Send;
1378}
1379
1380pub trait PayloadValidatorBuilder<Node: FullNodeComponents>: Send + Sync + Clone {
1385 type Validator: PayloadValidator<<Node::Types as NodeTypes>::Payload>;
1387
1388 fn build(
1393 self,
1394 ctx: &AddOnsContext<'_, Node>,
1395 ) -> impl Future<Output = eyre::Result<Self::Validator>> + Send;
1396}
1397
1398pub trait EngineValidatorBuilder<Node: FullNodeComponents>: Send + Sync + Clone {
1403 type EngineValidator: EngineValidator<<Node::Types as NodeTypes>::Payload, <Node::Types as NodeTypes>::Primitives>
1405 + WaitForCaches;
1406
1407 fn build_tree_validator(
1411 self,
1412 ctx: &AddOnsContext<'_, Node>,
1413 tree_config: TreeConfig,
1414 changeset_cache: ChangesetCache,
1415 ) -> impl Future<Output = eyre::Result<Self::EngineValidator>> + Send;
1416}
1417
1418#[derive(Debug, Clone)]
1422pub struct BasicEngineValidatorBuilder<EV> {
1423 payload_validator_builder: EV,
1425}
1426
1427impl<EV> BasicEngineValidatorBuilder<EV> {
1428 pub const fn new(payload_validator_builder: EV) -> Self {
1430 Self { payload_validator_builder }
1431 }
1432}
1433
1434impl<EV> Default for BasicEngineValidatorBuilder<EV>
1435where
1436 EV: Default,
1437{
1438 fn default() -> Self {
1439 Self::new(EV::default())
1440 }
1441}
1442
1443impl<Node, EV> EngineValidatorBuilder<Node> for BasicEngineValidatorBuilder<EV>
1444where
1445 Node: FullNodeComponents<
1446 Evm: ConfigureEngineEvm<
1447 <<Node::Types as NodeTypes>::Payload as PayloadTypes>::ExecutionData,
1448 >,
1449 >,
1450 EV: PayloadValidatorBuilder<Node>,
1451 EV::Validator: reth_engine_primitives::PayloadValidator<
1452 <Node::Types as NodeTypes>::Payload,
1453 Block = BlockTy<Node::Types>,
1454 > + Clone,
1455{
1456 type EngineValidator = BasicEngineValidator<Node::Provider, Node::Evm, EV::Validator>;
1457
1458 async fn build_tree_validator(
1459 self,
1460 ctx: &AddOnsContext<'_, Node>,
1461 tree_config: TreeConfig,
1462 changeset_cache: ChangesetCache,
1463 ) -> eyre::Result<Self::EngineValidator> {
1464 let validator = self.payload_validator_builder.build(ctx).await?;
1465 let data_dir = ctx.config.datadir.clone().resolve_datadir(ctx.config.chain.chain());
1466 let invalid_block_hook = ctx.create_invalid_block_hook(&data_dir).await?;
1467
1468 Ok(BasicEngineValidator::new(
1469 ctx.node.provider().clone(),
1470 std::sync::Arc::new(ctx.node.consensus().clone()),
1471 ctx.node.evm_config().clone(),
1472 validator,
1473 tree_config,
1474 invalid_block_hook,
1475 changeset_cache,
1476 ctx.node.task_executor().clone(),
1477 ))
1478 }
1479}
1480
1481#[derive(Debug, Default)]
1487pub struct BasicEngineApiBuilder<PVB> {
1488 payload_validator_builder: PVB,
1489}
1490
1491impl<N, PVB> EngineApiBuilder<N> for BasicEngineApiBuilder<PVB>
1492where
1493 N: FullNodeComponents<
1494 Types: NodeTypes<
1495 ChainSpec: EthereumHardforks,
1496 Payload: PayloadTypes<ExecutionData = ExecutionData> + EngineTypes,
1497 >,
1498 >,
1499 PVB: PayloadValidatorBuilder<N>,
1500 PVB::Validator: EngineApiValidator<<N::Types as NodeTypes>::Payload>,
1501{
1502 type EngineApi = EngineApi<
1503 N::Provider,
1504 <N::Types as NodeTypes>::Payload,
1505 N::Pool,
1506 PVB::Validator,
1507 <N::Types as NodeTypes>::ChainSpec,
1508 >;
1509
1510 async fn build_engine_api(self, ctx: &AddOnsContext<'_, N>) -> eyre::Result<Self::EngineApi> {
1511 let Self { payload_validator_builder } = self;
1512
1513 let engine_validator = payload_validator_builder.build(ctx).await?;
1514 let client = ClientVersionV1 {
1515 code: CLIENT_CODE,
1516 name: version_metadata().name_client.to_string(),
1517 version: version_metadata().cargo_pkg_version.to_string(),
1518 commit: version_metadata().vergen_git_sha.to_string(),
1519 };
1520
1521 Ok(EngineApi::new(
1522 ctx.node.provider().clone(),
1523 ctx.config.chain.clone(),
1524 ctx.beacon_engine_handle.clone(),
1525 PayloadStore::new(ctx.node.payload_builder_handle().clone()),
1526 ctx.node.pool().clone(),
1527 ctx.node.task_executor().clone(),
1528 client,
1529 EngineCapabilities::default(),
1530 engine_validator,
1531 ctx.config.engine.accept_execution_requests_hash,
1532 ctx.node.network().clone(),
1533 ))
1534 }
1535}
1536
1537#[derive(Debug, Clone, Default)]
1543#[non_exhaustive]
1544pub struct NoopEngineApiBuilder;
1545
1546impl<N: FullNodeComponents> EngineApiBuilder<N> for NoopEngineApiBuilder {
1547 type EngineApi = NoopEngineApi;
1548
1549 async fn build_engine_api(self, _ctx: &AddOnsContext<'_, N>) -> eyre::Result<Self::EngineApi> {
1550 Ok(NoopEngineApi::default())
1551 }
1552}
1553
1554#[derive(Debug, Clone, Default)]
1559#[non_exhaustive]
1560pub struct NoopEngineApi;
1561
1562impl IntoEngineApiRpcModule for NoopEngineApi {
1563 fn into_rpc_module(self) -> RpcModule<()> {
1564 RpcModule::new(())
1565 }
1566}
1567
1568#[derive(Clone, Debug)]
1573pub struct EngineShutdown {
1574 tx: Arc<Mutex<Option<oneshot::Sender<EngineShutdownRequest>>>>,
1576}
1577
1578impl EngineShutdown {
1579 pub fn new() -> (Self, oneshot::Receiver<EngineShutdownRequest>) {
1581 let (tx, rx) = oneshot::channel();
1582 (Self { tx: Arc::new(Mutex::new(Some(tx))) }, rx)
1583 }
1584
1585 pub fn shutdown(&self) -> Option<oneshot::Receiver<()>> {
1592 let mut guard = self.tx.lock();
1593 let tx = guard.take()?;
1594 let (done_tx, done_rx) = oneshot::channel();
1595 let _ = tx.send(EngineShutdownRequest { done_tx });
1596 Some(done_rx)
1597 }
1598}
1599
1600impl Default for EngineShutdown {
1601 fn default() -> Self {
1602 Self { tx: Arc::new(Mutex::new(None)) }
1603 }
1604}
1605
1606#[derive(Debug)]
1608pub struct EngineShutdownRequest {
1609 pub done_tx: oneshot::Sender<()>,
1611}