reth_node_builder/components/
payload.rs

1//! Payload service component for the node builder.
2
3use crate::{BuilderContext, FullNodeTypes};
4use reth_basic_payload_builder::{BasicPayloadJobGenerator, BasicPayloadJobGeneratorConfig};
5use reth_chain_state::CanonStateSubscriptions;
6use reth_node_api::{NodeTypes, PayloadBuilderFor};
7use reth_payload_builder::{PayloadBuilderHandle, PayloadBuilderService, PayloadServiceCommand};
8use reth_transaction_pool::TransactionPool;
9use std::future::Future;
10use tokio::sync::{broadcast, mpsc};
11use tracing::warn;
12
13/// A type that knows how to spawn the payload service.
14pub trait PayloadServiceBuilder<Node: FullNodeTypes, Pool: TransactionPool, EvmConfig>:
15    Send + Sized
16{
17    /// Spawns the [`PayloadBuilderService`] and returns the handle to it for use by the engine.
18    ///
19    /// We provide default implementation via [`BasicPayloadJobGenerator`] but it can be overridden
20    /// for custom job orchestration logic,
21    fn spawn_payload_builder_service(
22        self,
23        ctx: &BuilderContext<Node>,
24        pool: Pool,
25        evm_config: EvmConfig,
26    ) -> impl Future<Output = eyre::Result<PayloadBuilderHandle<<Node::Types as NodeTypes>::Payload>>>
27           + Send;
28}
29
30impl<Node, F, Fut, Pool, EvmConfig> PayloadServiceBuilder<Node, Pool, EvmConfig> for F
31where
32    Node: FullNodeTypes,
33    Pool: TransactionPool,
34    F: Fn(&BuilderContext<Node>, Pool, EvmConfig) -> Fut + Send,
35    Fut: Future<Output = eyre::Result<PayloadBuilderHandle<<Node::Types as NodeTypes>::Payload>>>
36        + Send,
37{
38    fn spawn_payload_builder_service(
39        self,
40        ctx: &BuilderContext<Node>,
41        pool: Pool,
42        evm_config: EvmConfig,
43    ) -> impl Future<Output = eyre::Result<PayloadBuilderHandle<<Node::Types as NodeTypes>::Payload>>>
44    {
45        self(ctx, pool, evm_config)
46    }
47}
48
49/// A type that knows how to build a payload builder to plug into [`BasicPayloadServiceBuilder`].
50pub trait PayloadBuilderBuilder<Node: FullNodeTypes, Pool: TransactionPool, EvmConfig>:
51    Send + Sized
52{
53    /// Payload builder implementation.
54    type PayloadBuilder: PayloadBuilderFor<Node::Types> + Unpin + 'static;
55
56    /// Spawns the payload service and returns the handle to it.
57    ///
58    /// The [`BuilderContext`] is provided to allow access to the node's configuration.
59    fn build_payload_builder(
60        self,
61        ctx: &BuilderContext<Node>,
62        pool: Pool,
63        evm_config: EvmConfig,
64    ) -> impl Future<Output = eyre::Result<Self::PayloadBuilder>> + Send;
65}
66
67/// Basic payload service builder that spawns a [`BasicPayloadJobGenerator`]
68#[derive(Debug, Default, Clone)]
69pub struct BasicPayloadServiceBuilder<PB>(PB);
70
71impl<PB> BasicPayloadServiceBuilder<PB> {
72    /// Create a new [`BasicPayloadServiceBuilder`].
73    pub const fn new(payload_builder_builder: PB) -> Self {
74        Self(payload_builder_builder)
75    }
76}
77
78impl<Node, Pool, PB, EvmConfig> PayloadServiceBuilder<Node, Pool, EvmConfig>
79    for BasicPayloadServiceBuilder<PB>
80where
81    Node: FullNodeTypes,
82    Pool: TransactionPool,
83    EvmConfig: Send,
84    PB: PayloadBuilderBuilder<Node, Pool, EvmConfig>,
85{
86    async fn spawn_payload_builder_service(
87        self,
88        ctx: &BuilderContext<Node>,
89        pool: Pool,
90        evm_config: EvmConfig,
91    ) -> eyre::Result<PayloadBuilderHandle<<Node::Types as NodeTypes>::Payload>> {
92        let payload_builder = self.0.build_payload_builder(ctx, pool, evm_config).await?;
93
94        let conf = ctx.config().builder.clone();
95
96        let payload_job_config = BasicPayloadJobGeneratorConfig::default()
97            .interval(conf.interval)
98            .deadline(conf.deadline)
99            .max_payload_tasks(conf.max_payload_tasks);
100
101        let payload_generator = BasicPayloadJobGenerator::with_builder(
102            ctx.provider().clone(),
103            ctx.task_executor().clone(),
104            payload_job_config,
105            payload_builder,
106        );
107        let (payload_service, payload_service_handle) =
108            PayloadBuilderService::new(payload_generator, ctx.provider().canonical_state_stream());
109
110        ctx.task_executor().spawn_critical("payload builder service", Box::pin(payload_service));
111
112        Ok(payload_service_handle)
113    }
114}
115
116/// A `NoopPayloadServiceBuilder` useful for node implementations that are not implementing
117/// validating/sequencing logic.
118#[derive(Debug, Clone, Copy, Default)]
119#[non_exhaustive]
120pub struct NoopPayloadServiceBuilder;
121
122impl<Node, Pool, Evm> PayloadServiceBuilder<Node, Pool, Evm> for NoopPayloadServiceBuilder
123where
124    Node: FullNodeTypes,
125    Pool: TransactionPool,
126    Evm: Send,
127{
128    async fn spawn_payload_builder_service(
129        self,
130        ctx: &BuilderContext<Node>,
131        _pool: Pool,
132        _evm_config: Evm,
133    ) -> eyre::Result<PayloadBuilderHandle<<Node::Types as NodeTypes>::Payload>> {
134        let (tx, mut rx) = mpsc::unbounded_channel();
135
136        ctx.task_executor().spawn_critical("payload builder", async move {
137            #[allow(clippy::collection_is_never_read)]
138            let mut subscriptions = Vec::new();
139
140            while let Some(message) = rx.recv().await {
141                match message {
142                    PayloadServiceCommand::Subscribe(tx) => {
143                        let (events_tx, events_rx) = broadcast::channel(100);
144                        // Retain senders to make sure that channels are not getting closed
145                        subscriptions.push(events_tx);
146                        let _ = tx.send(events_rx);
147                    }
148                    message => warn!(?message, "Noop payload service received a message"),
149                }
150            }
151        });
152
153        Ok(PayloadBuilderHandle::new(tx))
154    }
155}