reth_node_builder/components/
payload.rs1use crate::{BuilderContext, FullNodeTypes};
4use reth_basic_payload_builder::{BasicPayloadJobGenerator, BasicPayloadJobGeneratorConfig};
5use reth_chain_state::CanonStateSubscriptions;
6use reth_node_api::{NodeTypes, PayloadBuilderFor};
7use reth_payload_builder::{PayloadBuilderHandle, PayloadBuilderService, PayloadServiceCommand};
8use reth_transaction_pool::TransactionPool;
9use std::future::Future;
10use tokio::sync::{broadcast, mpsc};
11use tracing::warn;
12
13pub trait PayloadServiceBuilder<Node: FullNodeTypes, Pool: TransactionPool, EvmConfig>:
15 Send + Sized
16{
17 fn spawn_payload_builder_service(
22 self,
23 ctx: &BuilderContext<Node>,
24 pool: Pool,
25 evm_config: EvmConfig,
26 ) -> impl Future<Output = eyre::Result<PayloadBuilderHandle<<Node::Types as NodeTypes>::Payload>>>
27 + Send;
28}
29
30impl<Node, F, Fut, Pool, EvmConfig> PayloadServiceBuilder<Node, Pool, EvmConfig> for F
31where
32 Node: FullNodeTypes,
33 Pool: TransactionPool,
34 F: Fn(&BuilderContext<Node>, Pool, EvmConfig) -> Fut + Send,
35 Fut: Future<Output = eyre::Result<PayloadBuilderHandle<<Node::Types as NodeTypes>::Payload>>>
36 + Send,
37{
38 fn spawn_payload_builder_service(
39 self,
40 ctx: &BuilderContext<Node>,
41 pool: Pool,
42 evm_config: EvmConfig,
43 ) -> impl Future<Output = eyre::Result<PayloadBuilderHandle<<Node::Types as NodeTypes>::Payload>>>
44 {
45 self(ctx, pool, evm_config)
46 }
47}
48
49pub trait PayloadBuilderBuilder<Node: FullNodeTypes, Pool: TransactionPool, EvmConfig>:
51 Send + Sized
52{
53 type PayloadBuilder: PayloadBuilderFor<Node::Types> + Unpin + 'static;
55
56 fn build_payload_builder(
60 self,
61 ctx: &BuilderContext<Node>,
62 pool: Pool,
63 evm_config: EvmConfig,
64 ) -> impl Future<Output = eyre::Result<Self::PayloadBuilder>> + Send;
65}
66
67#[derive(Debug, Default, Clone)]
69pub struct BasicPayloadServiceBuilder<PB>(PB);
70
71impl<PB> BasicPayloadServiceBuilder<PB> {
72 pub const fn new(payload_builder_builder: PB) -> Self {
74 Self(payload_builder_builder)
75 }
76}
77
78impl<Node, Pool, PB, EvmConfig> PayloadServiceBuilder<Node, Pool, EvmConfig>
79 for BasicPayloadServiceBuilder<PB>
80where
81 Node: FullNodeTypes,
82 Pool: TransactionPool,
83 EvmConfig: Send,
84 PB: PayloadBuilderBuilder<Node, Pool, EvmConfig>,
85{
86 async fn spawn_payload_builder_service(
87 self,
88 ctx: &BuilderContext<Node>,
89 pool: Pool,
90 evm_config: EvmConfig,
91 ) -> eyre::Result<PayloadBuilderHandle<<Node::Types as NodeTypes>::Payload>> {
92 let payload_builder = self.0.build_payload_builder(ctx, pool, evm_config).await?;
93
94 let conf = ctx.config().builder.clone();
95
96 let payload_job_config = BasicPayloadJobGeneratorConfig::default()
97 .interval(conf.interval)
98 .deadline(conf.deadline)
99 .max_payload_tasks(conf.max_payload_tasks);
100
101 let payload_generator = BasicPayloadJobGenerator::with_builder(
102 ctx.provider().clone(),
103 ctx.task_executor().clone(),
104 payload_job_config,
105 payload_builder,
106 );
107 let (payload_service, payload_service_handle) =
108 PayloadBuilderService::new(payload_generator, ctx.provider().canonical_state_stream());
109
110 ctx.task_executor()
111 .spawn_critical_task("payload builder service", Box::pin(payload_service));
112
113 Ok(payload_service_handle)
114 }
115}
116
117#[derive(Debug, Clone, Copy, Default)]
120#[non_exhaustive]
121pub struct NoopPayloadServiceBuilder;
122
123impl<Node, Pool, Evm> PayloadServiceBuilder<Node, Pool, Evm> for NoopPayloadServiceBuilder
124where
125 Node: FullNodeTypes,
126 Pool: TransactionPool,
127 Evm: Send,
128{
129 async fn spawn_payload_builder_service(
130 self,
131 ctx: &BuilderContext<Node>,
132 _pool: Pool,
133 _evm_config: Evm,
134 ) -> eyre::Result<PayloadBuilderHandle<<Node::Types as NodeTypes>::Payload>> {
135 let (tx, mut rx) = mpsc::unbounded_channel();
136
137 ctx.task_executor().spawn_critical_task("payload builder", async move {
138 #[allow(clippy::collection_is_never_read)]
139 let mut subscriptions = Vec::new();
140
141 while let Some(message) = rx.recv().await {
142 match message {
143 PayloadServiceCommand::Subscribe(tx) => {
144 let (events_tx, events_rx) = broadcast::channel(100);
145 subscriptions.push(events_tx);
147 let _ = tx.send(events_rx);
148 }
149 message => warn!(?message, "Noop payload service received a message"),
150 }
151 }
152 });
153
154 Ok(PayloadBuilderHandle::new(tx))
155 }
156}