reth_node_builder/components/
payload.rs1use crate::{BuilderContext, FullNodeTypes};
4use reth_basic_payload_builder::{BasicPayloadJobGenerator, BasicPayloadJobGeneratorConfig};
5use reth_chain_state::CanonStateSubscriptions;
6use reth_node_api::{NodeTypes, PayloadBuilderFor};
7use reth_payload_builder::{PayloadBuilderHandle, PayloadBuilderService, PayloadServiceCommand};
8use reth_transaction_pool::TransactionPool;
9use std::future::Future;
10use tokio::sync::{broadcast, mpsc};
11use tracing::warn;
12
13pub trait PayloadServiceBuilder<Node: FullNodeTypes, Pool: TransactionPool, EvmConfig>:
15 Send + Sized
16{
17 fn spawn_payload_builder_service(
22 self,
23 ctx: &BuilderContext<Node>,
24 pool: Pool,
25 evm_config: EvmConfig,
26 ) -> impl Future<Output = eyre::Result<PayloadBuilderHandle<<Node::Types as NodeTypes>::Payload>>>
27 + Send;
28}
29
30impl<Node, F, Fut, Pool, EvmConfig> PayloadServiceBuilder<Node, Pool, EvmConfig> for F
31where
32 Node: FullNodeTypes,
33 Pool: TransactionPool,
34 F: Fn(&BuilderContext<Node>, Pool, EvmConfig) -> Fut + Send,
35 Fut: Future<Output = eyre::Result<PayloadBuilderHandle<<Node::Types as NodeTypes>::Payload>>>
36 + Send,
37{
38 fn spawn_payload_builder_service(
39 self,
40 ctx: &BuilderContext<Node>,
41 pool: Pool,
42 evm_config: EvmConfig,
43 ) -> impl Future<Output = eyre::Result<PayloadBuilderHandle<<Node::Types as NodeTypes>::Payload>>>
44 {
45 self(ctx, pool, evm_config)
46 }
47}
48
49pub trait PayloadBuilderBuilder<Node: FullNodeTypes, Pool: TransactionPool, EvmConfig>:
51 Send + Sized
52{
53 type PayloadBuilder: PayloadBuilderFor<Node::Types> + Unpin + 'static;
55
56 fn build_payload_builder(
60 self,
61 ctx: &BuilderContext<Node>,
62 pool: Pool,
63 evm_config: EvmConfig,
64 ) -> impl Future<Output = eyre::Result<Self::PayloadBuilder>> + Send;
65}
66
67#[derive(Debug, Default, Clone)]
69pub struct BasicPayloadServiceBuilder<PB>(PB);
70
71impl<PB> BasicPayloadServiceBuilder<PB> {
72 pub const fn new(payload_builder_builder: PB) -> Self {
74 Self(payload_builder_builder)
75 }
76}
77
78impl<Node, Pool, PB, EvmConfig> PayloadServiceBuilder<Node, Pool, EvmConfig>
79 for BasicPayloadServiceBuilder<PB>
80where
81 Node: FullNodeTypes,
82 Pool: TransactionPool,
83 EvmConfig: Send,
84 PB: PayloadBuilderBuilder<Node, Pool, EvmConfig>,
85{
86 async fn spawn_payload_builder_service(
87 self,
88 ctx: &BuilderContext<Node>,
89 pool: Pool,
90 evm_config: EvmConfig,
91 ) -> eyre::Result<PayloadBuilderHandle<<Node::Types as NodeTypes>::Payload>> {
92 let payload_builder = self.0.build_payload_builder(ctx, pool, evm_config).await?;
93
94 let conf = ctx.config().builder.clone();
95
96 let payload_job_config = BasicPayloadJobGeneratorConfig::default()
97 .interval(conf.interval)
98 .deadline(conf.deadline)
99 .max_payload_tasks(conf.max_payload_tasks);
100
101 let payload_generator = BasicPayloadJobGenerator::with_builder(
102 ctx.provider().clone(),
103 ctx.task_executor().clone(),
104 payload_job_config,
105 payload_builder,
106 );
107 let (payload_service, payload_service_handle) =
108 PayloadBuilderService::new(payload_generator, ctx.provider().canonical_state_stream());
109
110 ctx.task_executor().spawn_critical("payload builder service", Box::pin(payload_service));
111
112 Ok(payload_service_handle)
113 }
114}
115
116#[derive(Debug, Clone, Copy, Default)]
119#[non_exhaustive]
120pub struct NoopPayloadServiceBuilder;
121
122impl<Node, Pool, Evm> PayloadServiceBuilder<Node, Pool, Evm> for NoopPayloadServiceBuilder
123where
124 Node: FullNodeTypes,
125 Pool: TransactionPool,
126 Evm: Send,
127{
128 async fn spawn_payload_builder_service(
129 self,
130 ctx: &BuilderContext<Node>,
131 _pool: Pool,
132 _evm_config: Evm,
133 ) -> eyre::Result<PayloadBuilderHandle<<Node::Types as NodeTypes>::Payload>> {
134 let (tx, mut rx) = mpsc::unbounded_channel();
135
136 ctx.task_executor().spawn_critical("payload builder", async move {
137 #[allow(clippy::collection_is_never_read)]
138 let mut subscriptions = Vec::new();
139
140 while let Some(message) = rx.recv().await {
141 match message {
142 PayloadServiceCommand::Subscribe(tx) => {
143 let (events_tx, events_rx) = broadcast::channel(100);
144 subscriptions.push(events_tx);
146 let _ = tx.send(events_rx);
147 }
148 message => warn!(?message, "Noop payload service received a message"),
149 }
150 }
151 });
152
153 Ok(PayloadBuilderHandle::new(tx))
154 }
155}