reth_node_builder/components/
payload.rs1use crate::{BuilderContext, FullNodeTypes};
4use reth_basic_payload_builder::{BasicPayloadJobGenerator, BasicPayloadJobGeneratorConfig};
5use reth_chain_state::CanonStateSubscriptions;
6use reth_node_api::{NodeTypes, PayloadBuilderFor};
7use reth_payload_builder::{PayloadBuilderHandle, PayloadBuilderService, PayloadServiceCommand};
8use reth_transaction_pool::TransactionPool;
9use std::future::Future;
10use tokio::sync::{broadcast, mpsc};
11use tracing::warn;
12
13pub trait PayloadServiceBuilder<Node: FullNodeTypes, Pool: TransactionPool, EvmConfig>:
15 Send + Sized
16{
17 fn spawn_payload_builder_service(
22 self,
23 ctx: &BuilderContext<Node>,
24 pool: Pool,
25 evm_config: EvmConfig,
26 ) -> impl Future<Output = eyre::Result<PayloadBuilderHandle<<Node::Types as NodeTypes>::Payload>>>
27 + Send;
28}
29
30impl<Node, F, Fut, Pool, EvmConfig> PayloadServiceBuilder<Node, Pool, EvmConfig> for F
31where
32 Node: FullNodeTypes,
33 Pool: TransactionPool,
34 F: Fn(&BuilderContext<Node>, Pool, EvmConfig) -> Fut + Send,
35 Fut: Future<Output = eyre::Result<PayloadBuilderHandle<<Node::Types as NodeTypes>::Payload>>>
36 + Send,
37{
38 fn spawn_payload_builder_service(
39 self,
40 ctx: &BuilderContext<Node>,
41 pool: Pool,
42 evm_config: EvmConfig,
43 ) -> impl Future<Output = eyre::Result<PayloadBuilderHandle<<Node::Types as NodeTypes>::Payload>>>
44 {
45 self(ctx, pool, evm_config)
46 }
47}
48
49pub trait PayloadBuilderBuilder<Node: FullNodeTypes, Pool: TransactionPool, EvmConfig>:
51 Send + Sized
52{
53 type PayloadBuilder: PayloadBuilderFor<Node::Types> + Unpin + 'static;
55
56 fn build_payload_builder(
60 self,
61 ctx: &BuilderContext<Node>,
62 pool: Pool,
63 evm_config: EvmConfig,
64 ) -> impl Future<Output = eyre::Result<Self::PayloadBuilder>> + Send;
65}
66
67#[derive(Debug, Default, Clone)]
69pub struct BasicPayloadServiceBuilder<PB>(PB);
70
71impl<PB> BasicPayloadServiceBuilder<PB> {
72 pub const fn new(payload_builder_builder: PB) -> Self {
74 Self(payload_builder_builder)
75 }
76}
77
78impl<Node, Pool, PB, EvmConfig> PayloadServiceBuilder<Node, Pool, EvmConfig>
79 for BasicPayloadServiceBuilder<PB>
80where
81 Node: FullNodeTypes,
82 Pool: TransactionPool,
83 EvmConfig: Send,
84 PB: PayloadBuilderBuilder<Node, Pool, EvmConfig>,
85{
86 async fn spawn_payload_builder_service(
87 self,
88 ctx: &BuilderContext<Node>,
89 pool: Pool,
90 evm_config: EvmConfig,
91 ) -> eyre::Result<PayloadBuilderHandle<<Node::Types as NodeTypes>::Payload>> {
92 let payload_builder = self.0.build_payload_builder(ctx, pool, evm_config).await?;
93
94 let conf = ctx.config().builder.clone();
95
96 let payload_job_config = BasicPayloadJobGeneratorConfig::default()
97 .interval(conf.interval)
98 .deadline(conf.deadline)
99 .max_payload_tasks(conf.max_payload_tasks);
100
101 let payload_generator = BasicPayloadJobGenerator::with_builder(
102 ctx.provider().clone(),
103 ctx.task_executor().clone(),
104 payload_job_config,
105 payload_builder,
106 );
107 let (payload_service, payload_service_handle) =
108 PayloadBuilderService::<_, _, <Node::Types as NodeTypes>::Payload>::new(
109 payload_generator,
110 ctx.provider().canonical_state_stream(),
111 );
112
113 ctx.task_executor().spawn_critical_os_thread(
114 "payload-builder",
115 "payload builder service",
116 payload_service,
117 );
118
119 Ok(payload_service_handle)
120 }
121}
122
123#[derive(Debug, Clone, Copy, Default)]
126#[non_exhaustive]
127pub struct NoopPayloadServiceBuilder;
128
129impl<Node, Pool, Evm> PayloadServiceBuilder<Node, Pool, Evm> for NoopPayloadServiceBuilder
130where
131 Node: FullNodeTypes,
132 Pool: TransactionPool,
133 Evm: Send,
134{
135 async fn spawn_payload_builder_service(
136 self,
137 ctx: &BuilderContext<Node>,
138 _pool: Pool,
139 _evm_config: Evm,
140 ) -> eyre::Result<PayloadBuilderHandle<<Node::Types as NodeTypes>::Payload>> {
141 let (tx, mut rx) = mpsc::unbounded_channel();
142
143 ctx.task_executor().spawn_critical_os_thread(
144 "payload-builder",
145 "payload builder",
146 async move {
147 #[expect(clippy::collection_is_never_read)]
148 let mut subscriptions = Vec::new();
149
150 while let Some(message) = rx.recv().await {
151 match message {
152 PayloadServiceCommand::Subscribe(tx) => {
153 let (events_tx, events_rx) = broadcast::channel(100);
154 subscriptions.push(events_tx);
156 let _ = tx.send(events_rx);
157 }
158 message => warn!(?message, "Noop payload service received a message"),
159 }
160 }
161 },
162 );
163
164 Ok(PayloadBuilderHandle::new(tx))
165 }
166}