1use crate::{BeaconConsensusEngineEvent, BeaconConsensusEngineHandle};
4use alloy_rpc_types::engine::ClientVersionV1;
5use alloy_rpc_types_engine::ExecutionData;
6use futures::TryFutureExt;
7use reth_chain_state::CanonStateSubscriptions;
8use reth_chainspec::{ChainSpecProvider, EthereumHardforks};
9use reth_node_api::{
10 AddOnsContext, BlockTy, EngineTypes, EngineValidator, FullNodeComponents, FullNodeTypes,
11 NodeAddOns, NodeTypes, PayloadTypes, ReceiptTy,
12};
13use reth_node_core::{
14 node_config::NodeConfig,
15 version::{CARGO_PKG_VERSION, CLIENT_CODE, NAME_CLIENT, VERGEN_GIT_SHA},
16};
17use reth_payload_builder::{PayloadBuilderHandle, PayloadStore};
18use reth_rpc::eth::{EthApiTypes, FullEthApiServer};
19use reth_rpc_api::{eth::helpers::AddDevSigners, IntoEngineApiRpcModule};
20use reth_rpc_builder::{
21 auth::{AuthRpcModule, AuthServerHandle},
22 config::RethRpcServerConfig,
23 RpcModuleBuilder, RpcRegistryInner, RpcServerHandle, TransportRpcModules,
24};
25use reth_rpc_engine_api::{capabilities::EngineCapabilities, EngineApi};
26use reth_rpc_eth_types::{cache::cache_new_blocks_task, EthConfig, EthStateCache};
27use reth_tasks::TaskExecutor;
28use reth_tokio_util::EventSender;
29use reth_tracing::tracing::{debug, info};
30use std::{
31 fmt::{self, Debug},
32 future::Future,
33 ops::{Deref, DerefMut},
34};
35
36#[derive(Debug, Clone)]
40pub struct RethRpcServerHandles {
41 pub rpc: RpcServerHandle,
43 pub auth: AuthServerHandle,
45}
46
47pub struct RpcHooks<Node: FullNodeComponents, EthApi> {
49 pub on_rpc_started: Box<dyn OnRpcStarted<Node, EthApi>>,
51 pub extend_rpc_modules: Box<dyn ExtendRpcModules<Node, EthApi>>,
53}
54
55impl<Node, EthApi> Default for RpcHooks<Node, EthApi>
56where
57 Node: FullNodeComponents,
58 EthApi: EthApiTypes,
59{
60 fn default() -> Self {
61 Self { on_rpc_started: Box::<()>::default(), extend_rpc_modules: Box::<()>::default() }
62 }
63}
64
65impl<Node, EthApi> RpcHooks<Node, EthApi>
66where
67 Node: FullNodeComponents,
68 EthApi: EthApiTypes,
69{
70 pub(crate) fn set_on_rpc_started<F>(&mut self, hook: F) -> &mut Self
72 where
73 F: OnRpcStarted<Node, EthApi> + 'static,
74 {
75 self.on_rpc_started = Box::new(hook);
76 self
77 }
78
79 #[expect(unused)]
81 pub(crate) fn on_rpc_started<F>(mut self, hook: F) -> Self
82 where
83 F: OnRpcStarted<Node, EthApi> + 'static,
84 {
85 self.set_on_rpc_started(hook);
86 self
87 }
88
89 pub(crate) fn set_extend_rpc_modules<F>(&mut self, hook: F) -> &mut Self
91 where
92 F: ExtendRpcModules<Node, EthApi> + 'static,
93 {
94 self.extend_rpc_modules = Box::new(hook);
95 self
96 }
97
98 #[expect(unused)]
100 pub(crate) fn extend_rpc_modules<F>(mut self, hook: F) -> Self
101 where
102 F: ExtendRpcModules<Node, EthApi> + 'static,
103 {
104 self.set_extend_rpc_modules(hook);
105 self
106 }
107}
108
109impl<Node, EthApi> fmt::Debug for RpcHooks<Node, EthApi>
110where
111 Node: FullNodeComponents,
112 EthApi: EthApiTypes,
113{
114 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
115 f.debug_struct("RpcHooks")
116 .field("on_rpc_started", &"...")
117 .field("extend_rpc_modules", &"...")
118 .finish()
119 }
120}
121
122pub trait OnRpcStarted<Node: FullNodeComponents, EthApi: EthApiTypes>: Send {
124 fn on_rpc_started(
126 self: Box<Self>,
127 ctx: RpcContext<'_, Node, EthApi>,
128 handles: RethRpcServerHandles,
129 ) -> eyre::Result<()>;
130}
131
132impl<Node, EthApi, F> OnRpcStarted<Node, EthApi> for F
133where
134 F: FnOnce(RpcContext<'_, Node, EthApi>, RethRpcServerHandles) -> eyre::Result<()> + Send,
135 Node: FullNodeComponents,
136 EthApi: EthApiTypes,
137{
138 fn on_rpc_started(
139 self: Box<Self>,
140 ctx: RpcContext<'_, Node, EthApi>,
141 handles: RethRpcServerHandles,
142 ) -> eyre::Result<()> {
143 (*self)(ctx, handles)
144 }
145}
146
147impl<Node, EthApi> OnRpcStarted<Node, EthApi> for ()
148where
149 Node: FullNodeComponents,
150 EthApi: EthApiTypes,
151{
152 fn on_rpc_started(
153 self: Box<Self>,
154 _: RpcContext<'_, Node, EthApi>,
155 _: RethRpcServerHandles,
156 ) -> eyre::Result<()> {
157 Ok(())
158 }
159}
160
161pub trait ExtendRpcModules<Node: FullNodeComponents, EthApi: EthApiTypes>: Send {
163 fn extend_rpc_modules(self: Box<Self>, ctx: RpcContext<'_, Node, EthApi>) -> eyre::Result<()>;
165}
166
167impl<Node, EthApi, F> ExtendRpcModules<Node, EthApi> for F
168where
169 F: FnOnce(RpcContext<'_, Node, EthApi>) -> eyre::Result<()> + Send,
170 Node: FullNodeComponents,
171 EthApi: EthApiTypes,
172{
173 fn extend_rpc_modules(self: Box<Self>, ctx: RpcContext<'_, Node, EthApi>) -> eyre::Result<()> {
174 (*self)(ctx)
175 }
176}
177
178impl<Node, EthApi> ExtendRpcModules<Node, EthApi> for ()
179where
180 Node: FullNodeComponents,
181 EthApi: EthApiTypes,
182{
183 fn extend_rpc_modules(self: Box<Self>, _: RpcContext<'_, Node, EthApi>) -> eyre::Result<()> {
184 Ok(())
185 }
186}
187
188#[derive(Debug, Clone)]
190#[expect(clippy::type_complexity)]
191pub struct RpcRegistry<Node: FullNodeComponents, EthApi: EthApiTypes> {
192 pub(crate) registry: RpcRegistryInner<
193 Node::Provider,
194 Node::Pool,
195 Node::Network,
196 TaskExecutor,
197 EthApi,
198 Node::Executor,
199 Node::Consensus,
200 >,
201}
202
203impl<Node, EthApi> Deref for RpcRegistry<Node, EthApi>
204where
205 Node: FullNodeComponents,
206 EthApi: EthApiTypes,
207{
208 type Target = RpcRegistryInner<
209 Node::Provider,
210 Node::Pool,
211 Node::Network,
212 TaskExecutor,
213 EthApi,
214 Node::Executor,
215 Node::Consensus,
216 >;
217
218 fn deref(&self) -> &Self::Target {
219 &self.registry
220 }
221}
222
223impl<Node, EthApi> DerefMut for RpcRegistry<Node, EthApi>
224where
225 Node: FullNodeComponents,
226 EthApi: EthApiTypes,
227{
228 fn deref_mut(&mut self) -> &mut Self::Target {
229 &mut self.registry
230 }
231}
232
233#[expect(missing_debug_implementations)]
241pub struct RpcContext<'a, Node: FullNodeComponents, EthApi: EthApiTypes> {
242 pub(crate) node: Node,
244
245 pub(crate) config: &'a NodeConfig<<Node::Types as NodeTypes>::ChainSpec>,
247
248 pub registry: &'a mut RpcRegistry<Node, EthApi>,
252 pub modules: &'a mut TransportRpcModules,
257 pub auth_module: &'a mut AuthRpcModule,
261}
262
263impl<Node, EthApi> RpcContext<'_, Node, EthApi>
264where
265 Node: FullNodeComponents,
266 EthApi: EthApiTypes,
267{
268 pub const fn config(&self) -> &NodeConfig<<Node::Types as NodeTypes>::ChainSpec> {
270 self.config
271 }
272
273 pub const fn node(&self) -> &Node {
275 &self.node
276 }
277
278 pub fn pool(&self) -> &Node::Pool {
280 self.node.pool()
281 }
282
283 pub fn provider(&self) -> &Node::Provider {
285 self.node.provider()
286 }
287
288 pub fn network(&self) -> &Node::Network {
290 self.node.network()
291 }
292
293 pub fn payload_builder_handle(
295 &self,
296 ) -> &PayloadBuilderHandle<<Node::Types as NodeTypes>::Payload> {
297 self.node.payload_builder_handle()
298 }
299}
300
301pub struct RpcHandle<Node: FullNodeComponents, EthApi: EthApiTypes> {
303 pub rpc_server_handles: RethRpcServerHandles,
305 pub rpc_registry: RpcRegistry<Node, EthApi>,
307 pub engine_events:
312 EventSender<BeaconConsensusEngineEvent<<Node::Types as NodeTypes>::Primitives>>,
313 pub beacon_engine_handle: BeaconConsensusEngineHandle<<Node::Types as NodeTypes>::Payload>,
315}
316
317impl<Node: FullNodeComponents, EthApi: EthApiTypes> Clone for RpcHandle<Node, EthApi> {
318 fn clone(&self) -> Self {
319 Self {
320 rpc_server_handles: self.rpc_server_handles.clone(),
321 rpc_registry: self.rpc_registry.clone(),
322 engine_events: self.engine_events.clone(),
323 beacon_engine_handle: self.beacon_engine_handle.clone(),
324 }
325 }
326}
327
328impl<Node: FullNodeComponents, EthApi: EthApiTypes> Deref for RpcHandle<Node, EthApi> {
329 type Target = RpcRegistry<Node, EthApi>;
330
331 fn deref(&self) -> &Self::Target {
332 &self.rpc_registry
333 }
334}
335
336impl<Node: FullNodeComponents, EthApi: EthApiTypes> Debug for RpcHandle<Node, EthApi>
337where
338 RpcRegistry<Node, EthApi>: Debug,
339{
340 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
341 f.debug_struct("RpcHandle")
342 .field("rpc_server_handles", &self.rpc_server_handles)
343 .field("rpc_registry", &self.rpc_registry)
344 .finish()
345 }
346}
347
348pub struct RpcAddOns<
350 Node: FullNodeComponents,
351 EthB: EthApiBuilder<Node>,
352 EV,
353 EB = BasicEngineApiBuilder<EV>,
354> {
355 pub hooks: RpcHooks<Node, EthB::EthApi>,
357 eth_api_builder: EthB,
359 engine_validator_builder: EV,
361 engine_api_builder: EB,
363}
364
365impl<Node, EthB, EV, EB> Debug for RpcAddOns<Node, EthB, EV, EB>
366where
367 Node: FullNodeComponents,
368 EthB: EthApiBuilder<Node>,
369 EV: Debug,
370 EB: Debug,
371{
372 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
373 f.debug_struct("RpcAddOns")
374 .field("hooks", &self.hooks)
375 .field("eth_api_builder", &"...")
376 .field("engine_validator_builder", &self.engine_validator_builder)
377 .field("engine_api_builder", &self.engine_api_builder)
378 .finish()
379 }
380}
381
382impl<Node, EthB, EV, EB> RpcAddOns<Node, EthB, EV, EB>
383where
384 Node: FullNodeComponents,
385 EthB: EthApiBuilder<Node>,
386{
387 pub fn new(
389 eth_api_builder: EthB,
390 engine_validator_builder: EV,
391 engine_api_builder: EB,
392 ) -> Self {
393 Self {
394 hooks: RpcHooks::default(),
395 eth_api_builder,
396 engine_validator_builder,
397 engine_api_builder,
398 }
399 }
400
401 pub fn on_rpc_started<F>(mut self, hook: F) -> Self
403 where
404 F: FnOnce(RpcContext<'_, Node, EthB::EthApi>, RethRpcServerHandles) -> eyre::Result<()>
405 + Send
406 + 'static,
407 {
408 self.hooks.set_on_rpc_started(hook);
409 self
410 }
411
412 pub fn extend_rpc_modules<F>(mut self, hook: F) -> Self
414 where
415 F: FnOnce(RpcContext<'_, Node, EthB::EthApi>) -> eyre::Result<()> + Send + 'static,
416 {
417 self.hooks.set_extend_rpc_modules(hook);
418 self
419 }
420}
421
422impl<Node, EthB, EV, EB> Default for RpcAddOns<Node, EthB, EV, EB>
423where
424 Node: FullNodeComponents,
425 EthB: EthApiBuilder<Node>,
426 EV: Default,
427 EB: Default,
428{
429 fn default() -> Self {
430 Self::new(EthB::default(), EV::default(), EB::default())
431 }
432}
433
434impl<N, EthB, EV, EB> RpcAddOns<N, EthB, EV, EB>
435where
436 N: FullNodeComponents,
437 N::Provider: ChainSpecProvider<ChainSpec: EthereumHardforks>,
438 EthB: EthApiBuilder<N>,
439 EV: EngineValidatorBuilder<N>,
440 EB: EngineApiBuilder<N>,
441{
442 pub async fn launch_add_ons_with<F>(
445 self,
446 ctx: AddOnsContext<'_, N>,
447 ext: F,
448 ) -> eyre::Result<RpcHandle<N, EthB::EthApi>>
449 where
450 F: FnOnce(
451 &mut TransportRpcModules,
452 &mut AuthRpcModule,
453 &mut RpcRegistry<N, EthB::EthApi>,
454 ) -> eyre::Result<()>,
455 {
456 let Self { eth_api_builder, engine_api_builder, hooks, .. } = self;
457
458 let engine_api = engine_api_builder.build_engine_api(&ctx).await?;
459 let AddOnsContext { node, config, beacon_engine_handle, jwt_secret, engine_events } = ctx;
460
461 info!(target: "reth::cli", "Engine API handler initialized");
462
463 let cache = EthStateCache::spawn_with(
464 node.provider().clone(),
465 config.rpc.eth_config().cache,
466 node.task_executor().clone(),
467 );
468
469 let new_canonical_blocks = node.provider().canonical_state_stream();
470 let c = cache.clone();
471 node.task_executor().spawn_critical(
472 "cache canonical blocks task",
473 Box::pin(async move {
474 cache_new_blocks_task(c, new_canonical_blocks).await;
475 }),
476 );
477
478 let ctx = EthApiCtx { components: &node, config: config.rpc.eth_config(), cache };
479 let eth_api = eth_api_builder.build_eth_api(ctx).await?;
480
481 let auth_config = config.rpc.auth_server_config(jwt_secret)?;
482 let module_config = config.rpc.transport_rpc_module_config();
483 debug!(target: "reth::cli", http=?module_config.http(), ws=?module_config.ws(), "Using RPC module config");
484
485 let (mut modules, mut auth_module, registry) = RpcModuleBuilder::default()
486 .with_provider(node.provider().clone())
487 .with_pool(node.pool().clone())
488 .with_network(node.network().clone())
489 .with_executor(node.task_executor().clone())
490 .with_evm_config(node.evm_config().clone())
491 .with_block_executor(node.block_executor().clone())
492 .with_consensus(node.consensus().clone())
493 .build_with_auth_server(module_config, engine_api, eth_api);
494
495 if config.dev.dev {
497 registry.eth_api().with_dev_accounts();
498 }
499
500 let mut registry = RpcRegistry { registry };
501 let ctx = RpcContext {
502 node: node.clone(),
503 config,
504 registry: &mut registry,
505 modules: &mut modules,
506 auth_module: &mut auth_module,
507 };
508
509 let RpcHooks { on_rpc_started, extend_rpc_modules } = hooks;
510
511 ext(ctx.modules, ctx.auth_module, ctx.registry)?;
512 extend_rpc_modules.extend_rpc_modules(ctx)?;
513
514 let server_config = config.rpc.rpc_server_config();
515 let cloned_modules = modules.clone();
516 let launch_rpc = server_config.start(&cloned_modules).map_ok(|handle| {
517 if let Some(path) = handle.ipc_endpoint() {
518 info!(target: "reth::cli", %path, "RPC IPC server started");
519 }
520 if let Some(addr) = handle.http_local_addr() {
521 info!(target: "reth::cli", url=%addr, "RPC HTTP server started");
522 }
523 if let Some(addr) = handle.ws_local_addr() {
524 info!(target: "reth::cli", url=%addr, "RPC WS server started");
525 }
526 handle
527 });
528
529 let launch_auth = auth_module.clone().start_server(auth_config).map_ok(|handle| {
530 let addr = handle.local_addr();
531 if let Some(ipc_endpoint) = handle.ipc_endpoint() {
532 info!(target: "reth::cli", url=%addr, ipc_endpoint=%ipc_endpoint, "RPC auth server started");
533 } else {
534 info!(target: "reth::cli", url=%addr, "RPC auth server started");
535 }
536 handle
537 });
538
539 let (rpc, auth) = futures::future::try_join(launch_rpc, launch_auth).await?;
541
542 let handles = RethRpcServerHandles { rpc, auth };
543
544 let ctx = RpcContext {
545 node: node.clone(),
546 config,
547 registry: &mut registry,
548 modules: &mut modules,
549 auth_module: &mut auth_module,
550 };
551
552 on_rpc_started.on_rpc_started(ctx, handles.clone())?;
553
554 Ok(RpcHandle {
555 rpc_server_handles: handles,
556 rpc_registry: registry,
557 engine_events,
558 beacon_engine_handle,
559 })
560 }
561}
562
563impl<N, EthB, EV, EB> NodeAddOns<N> for RpcAddOns<N, EthB, EV, EB>
564where
565 N: FullNodeComponents,
566 <N as FullNodeTypes>::Provider: ChainSpecProvider<ChainSpec: EthereumHardforks>,
567 EthB: EthApiBuilder<N>,
568 EV: EngineValidatorBuilder<N>,
569 EB: EngineApiBuilder<N>,
570{
571 type Handle = RpcHandle<N, EthB::EthApi>;
572
573 async fn launch_add_ons(self, ctx: AddOnsContext<'_, N>) -> eyre::Result<Self::Handle> {
574 self.launch_add_ons_with(ctx, |_, _, _| Ok(())).await
575 }
576}
577
578pub trait RethRpcAddOns<N: FullNodeComponents>:
581 NodeAddOns<N, Handle = RpcHandle<N, Self::EthApi>>
582{
583 type EthApi: EthApiTypes;
585
586 fn hooks_mut(&mut self) -> &mut RpcHooks<N, Self::EthApi>;
588}
589
590impl<N: FullNodeComponents, EthB, EV, EB> RethRpcAddOns<N> for RpcAddOns<N, EthB, EV, EB>
591where
592 Self: NodeAddOns<N, Handle = RpcHandle<N, EthB::EthApi>>,
593 EthB: EthApiBuilder<N>,
594{
595 type EthApi = EthB::EthApi;
596
597 fn hooks_mut(&mut self) -> &mut RpcHooks<N, Self::EthApi> {
598 &mut self.hooks
599 }
600}
601
602#[derive(Debug)]
605pub struct EthApiCtx<'a, N: FullNodeTypes> {
606 pub components: &'a N,
608 pub config: EthConfig,
610 pub cache: EthStateCache<BlockTy<N::Types>, ReceiptTy<N::Types>>,
612}
613
614pub trait EthApiBuilder<N: FullNodeComponents>: Default + Send + 'static {
616 type EthApi: EthApiTypes
618 + FullEthApiServer<Provider = N::Provider, Pool = N::Pool>
619 + AddDevSigners
620 + Unpin
621 + 'static;
622
623 fn build_eth_api(
625 self,
626 ctx: EthApiCtx<'_, N>,
627 ) -> impl Future<Output = eyre::Result<Self::EthApi>> + Send;
628}
629
630pub trait EngineValidatorAddOn<Node: FullNodeComponents>: Send {
632 type Validator: EngineValidator<<Node::Types as NodeTypes>::Payload, Block = BlockTy<Node::Types>>
634 + Clone;
635
636 fn engine_validator(
638 &self,
639 ctx: &AddOnsContext<'_, Node>,
640 ) -> impl Future<Output = eyre::Result<Self::Validator>>;
641}
642
643impl<N, EthB, EV, EB> EngineValidatorAddOn<N> for RpcAddOns<N, EthB, EV, EB>
644where
645 N: FullNodeComponents,
646 EthB: EthApiBuilder<N>,
647 EV: EngineValidatorBuilder<N>,
648 EB: EngineApiBuilder<N>,
649{
650 type Validator = EV::Validator;
651
652 async fn engine_validator(&self, ctx: &AddOnsContext<'_, N>) -> eyre::Result<Self::Validator> {
653 self.engine_validator_builder.clone().build(ctx).await
654 }
655}
656
657pub trait EngineValidatorBuilder<Node: FullNodeComponents>: Send + Sync + Clone {
659 type Validator: EngineValidator<<Node::Types as NodeTypes>::Payload, Block = BlockTy<Node::Types>>
661 + Clone;
662
663 fn build(
665 self,
666 ctx: &AddOnsContext<'_, Node>,
667 ) -> impl Future<Output = eyre::Result<Self::Validator>> + Send;
668}
669
670impl<Node, F, Fut, Validator> EngineValidatorBuilder<Node> for F
671where
672 Node: FullNodeComponents,
673 Validator: EngineValidator<<Node::Types as NodeTypes>::Payload, Block = BlockTy<Node::Types>>
674 + Clone
675 + Unpin
676 + 'static,
677 F: FnOnce(&AddOnsContext<'_, Node>) -> Fut + Send + Sync + Clone,
678 Fut: Future<Output = eyre::Result<Validator>> + Send,
679{
680 type Validator = Validator;
681
682 fn build(
683 self,
684 ctx: &AddOnsContext<'_, Node>,
685 ) -> impl Future<Output = eyre::Result<Self::Validator>> {
686 self(ctx)
687 }
688}
689
690pub trait EngineApiBuilder<Node: FullNodeComponents>: Send + Sync {
692 type EngineApi: IntoEngineApiRpcModule + Send + Sync;
694
695 fn build_engine_api(
697 self,
698 ctx: &AddOnsContext<'_, Node>,
699 ) -> impl Future<Output = eyre::Result<Self::EngineApi>> + Send;
700}
701
702#[derive(Debug, Default)]
704pub struct BasicEngineApiBuilder<EV> {
705 engine_validator_builder: EV,
706}
707
708impl<N, EV> EngineApiBuilder<N> for BasicEngineApiBuilder<EV>
709where
710 N: FullNodeComponents<
711 Types: NodeTypes<
712 ChainSpec: EthereumHardforks,
713 Payload: PayloadTypes<ExecutionData = ExecutionData> + EngineTypes,
714 >,
715 >,
716 EV: EngineValidatorBuilder<N>,
717{
718 type EngineApi = EngineApi<
719 N::Provider,
720 <N::Types as NodeTypes>::Payload,
721 N::Pool,
722 EV::Validator,
723 <N::Types as NodeTypes>::ChainSpec,
724 >;
725
726 async fn build_engine_api(self, ctx: &AddOnsContext<'_, N>) -> eyre::Result<Self::EngineApi> {
727 let Self { engine_validator_builder } = self;
728
729 let engine_validator = engine_validator_builder.build(ctx).await?;
730 let client = ClientVersionV1 {
731 code: CLIENT_CODE,
732 name: NAME_CLIENT.to_string(),
733 version: CARGO_PKG_VERSION.to_string(),
734 commit: VERGEN_GIT_SHA.to_string(),
735 };
736 Ok(EngineApi::new(
737 ctx.node.provider().clone(),
738 ctx.config.chain.clone(),
739 ctx.beacon_engine_handle.clone(),
740 PayloadStore::new(ctx.node.payload_builder_handle().clone()),
741 ctx.node.pool().clone(),
742 Box::new(ctx.node.task_executor().clone()),
743 client,
744 EngineCapabilities::default(),
745 engine_validator,
746 ctx.config.engine.accept_execution_requests_hash,
747 ))
748 }
749}