Skip to main content

reth_node_builder/components/
payload.rs

1//! Payload service component for the node builder.
2
3use crate::{BuilderContext, FullNodeTypes};
4use reth_basic_payload_builder::{BasicPayloadJobGenerator, BasicPayloadJobGeneratorConfig};
5use reth_chain_state::CanonStateSubscriptions;
6use reth_node_api::{NodeTypes, PayloadBuilderFor};
7use reth_payload_builder::{PayloadBuilderHandle, PayloadBuilderService, PayloadServiceCommand};
8use reth_transaction_pool::TransactionPool;
9use std::future::Future;
10use tokio::sync::{broadcast, mpsc};
11use tracing::warn;
12
13/// A type that knows how to spawn the payload service.
14pub trait PayloadServiceBuilder<Node: FullNodeTypes, Pool: TransactionPool, EvmConfig>:
15    Send + Sized
16{
17    /// Spawns the [`PayloadBuilderService`] and returns the handle to it for use by the engine.
18    ///
19    /// We provide default implementation via [`BasicPayloadJobGenerator`] but it can be overridden
20    /// for custom job orchestration logic,
21    fn spawn_payload_builder_service(
22        self,
23        ctx: &BuilderContext<Node>,
24        pool: Pool,
25        evm_config: EvmConfig,
26    ) -> impl Future<Output = eyre::Result<PayloadBuilderHandle<<Node::Types as NodeTypes>::Payload>>>
27           + Send;
28}
29
30impl<Node, F, Fut, Pool, EvmConfig> PayloadServiceBuilder<Node, Pool, EvmConfig> for F
31where
32    Node: FullNodeTypes,
33    Pool: TransactionPool,
34    F: Fn(&BuilderContext<Node>, Pool, EvmConfig) -> Fut + Send,
35    Fut: Future<Output = eyre::Result<PayloadBuilderHandle<<Node::Types as NodeTypes>::Payload>>>
36        + Send,
37{
38    fn spawn_payload_builder_service(
39        self,
40        ctx: &BuilderContext<Node>,
41        pool: Pool,
42        evm_config: EvmConfig,
43    ) -> impl Future<Output = eyre::Result<PayloadBuilderHandle<<Node::Types as NodeTypes>::Payload>>>
44    {
45        self(ctx, pool, evm_config)
46    }
47}
48
49/// A type that knows how to build a payload builder to plug into [`BasicPayloadServiceBuilder`].
50pub trait PayloadBuilderBuilder<Node: FullNodeTypes, Pool: TransactionPool, EvmConfig>:
51    Send + Sized
52{
53    /// Payload builder implementation.
54    type PayloadBuilder: PayloadBuilderFor<Node::Types> + Unpin + 'static;
55
56    /// Spawns the payload service and returns the handle to it.
57    ///
58    /// The [`BuilderContext`] is provided to allow access to the node's configuration.
59    fn build_payload_builder(
60        self,
61        ctx: &BuilderContext<Node>,
62        pool: Pool,
63        evm_config: EvmConfig,
64    ) -> impl Future<Output = eyre::Result<Self::PayloadBuilder>> + Send;
65}
66
67/// Basic payload service builder that spawns a [`BasicPayloadJobGenerator`]
68#[derive(Debug, Clone)]
69pub struct BasicPayloadServiceBuilder<PB> {
70    /// Builds the payload builder used by generated payload jobs.
71    payload_builder_builder: PB,
72    /// Whether to pre-cache changed state from canonical state notifications.
73    pre_cache_state: bool,
74}
75
76impl<PB> BasicPayloadServiceBuilder<PB> {
77    /// Create a new [`BasicPayloadServiceBuilder`].
78    pub const fn new(payload_builder_builder: PB) -> Self {
79        Self { payload_builder_builder, pre_cache_state: true }
80    }
81
82    /// Sets whether to pre-cache changed state from canonical state notifications.
83    pub const fn with_pre_cache_state(mut self, pre_cache_state: bool) -> Self {
84        self.pre_cache_state = pre_cache_state;
85        self
86    }
87}
88
89impl<PB> Default for BasicPayloadServiceBuilder<PB>
90where
91    PB: Default,
92{
93    fn default() -> Self {
94        Self::new(PB::default())
95    }
96}
97
98impl<Node, Pool, PB, EvmConfig> PayloadServiceBuilder<Node, Pool, EvmConfig>
99    for BasicPayloadServiceBuilder<PB>
100where
101    Node: FullNodeTypes,
102    Pool: TransactionPool,
103    EvmConfig: Send,
104    PB: PayloadBuilderBuilder<Node, Pool, EvmConfig>,
105{
106    async fn spawn_payload_builder_service(
107        self,
108        ctx: &BuilderContext<Node>,
109        pool: Pool,
110        evm_config: EvmConfig,
111    ) -> eyre::Result<PayloadBuilderHandle<<Node::Types as NodeTypes>::Payload>> {
112        let Self { payload_builder_builder, pre_cache_state } = self;
113        let payload_builder =
114            payload_builder_builder.build_payload_builder(ctx, pool, evm_config).await?;
115
116        let conf = ctx.config().builder.clone();
117
118        let payload_job_config = BasicPayloadJobGeneratorConfig::default()
119            .interval(conf.interval)
120            .deadline(conf.deadline)
121            .max_payload_tasks(conf.max_payload_tasks)
122            .pre_cache_state(pre_cache_state);
123
124        let payload_generator = BasicPayloadJobGenerator::with_builder(
125            ctx.provider().clone(),
126            ctx.task_executor().clone(),
127            payload_job_config,
128            payload_builder,
129        );
130        let (payload_service, payload_service_handle) =
131            PayloadBuilderService::<_, _, <Node::Types as NodeTypes>::Payload>::new(
132                payload_generator,
133                ctx.provider().canonical_state_stream(),
134            );
135
136        ctx.task_executor().spawn_critical_os_thread(
137            "payload-service",
138            "payload builder service",
139            payload_service,
140        );
141
142        Ok(payload_service_handle)
143    }
144}
145
146/// A `NoopPayloadServiceBuilder` useful for node implementations that are not implementing
147/// validating/sequencing logic.
148#[derive(Debug, Clone, Copy, Default)]
149#[non_exhaustive]
150pub struct NoopPayloadServiceBuilder;
151
152impl<Node, Pool, Evm> PayloadServiceBuilder<Node, Pool, Evm> for NoopPayloadServiceBuilder
153where
154    Node: FullNodeTypes,
155    Pool: TransactionPool,
156    Evm: Send,
157{
158    async fn spawn_payload_builder_service(
159        self,
160        ctx: &BuilderContext<Node>,
161        _pool: Pool,
162        _evm_config: Evm,
163    ) -> eyre::Result<PayloadBuilderHandle<<Node::Types as NodeTypes>::Payload>> {
164        let (tx, mut rx) = mpsc::unbounded_channel();
165
166        ctx.task_executor().spawn_critical_os_thread(
167            "payload-service",
168            "payload builder service",
169            async move {
170                #[expect(clippy::collection_is_never_read)]
171                let mut subscriptions = Vec::new();
172
173                while let Some(message) = rx.recv().await {
174                    match message {
175                        PayloadServiceCommand::Subscribe(tx) => {
176                            let (events_tx, events_rx) = broadcast::channel(100);
177                            // Retain senders to make sure that channels are not getting closed
178                            subscriptions.push(events_tx);
179                            let _ = tx.send(events_rx);
180                        }
181                        message => warn!(?message, "Noop payload service received a message"),
182                    }
183                }
184            },
185        );
186
187        Ok(PayloadBuilderHandle::new(tx))
188    }
189}