reth_payload_builder/
service.rs

1//! Support for building payloads.
2//!
3//! The payload builder is responsible for building payloads.
4//! Once a new payload is created, it is continuously updated.
5
6use crate::{
7    metrics::PayloadBuilderServiceMetrics, traits::PayloadJobGenerator, KeepPayloadJobAlive,
8    PayloadJob,
9};
10use alloy_consensus::BlockHeader;
11use alloy_rpc_types::engine::PayloadId;
12use futures_util::{future::FutureExt, Stream, StreamExt};
13use reth_chain_state::CanonStateNotification;
14use reth_payload_builder_primitives::{Events, PayloadBuilderError, PayloadEvents};
15use reth_payload_primitives::{BuiltPayload, PayloadBuilderAttributes, PayloadKind, PayloadTypes};
16use reth_primitives_traits::NodePrimitives;
17use std::{
18    fmt,
19    future::Future,
20    pin::Pin,
21    sync::Arc,
22    task::{Context, Poll},
23};
24use tokio::sync::{
25    broadcast, mpsc,
26    oneshot::{self, Receiver},
27};
28use tokio_stream::wrappers::UnboundedReceiverStream;
29use tracing::{debug, info, trace, warn};
30
31type PayloadFuture<P> = Pin<Box<dyn Future<Output = Result<P, PayloadBuilderError>> + Send + Sync>>;
32
33/// A communication channel to the [`PayloadBuilderService`] that can retrieve payloads.
34///
35/// This type is intended to be used to retrieve payloads from the service (e.g. from the engine
36/// API).
37#[derive(Debug)]
38pub struct PayloadStore<T: PayloadTypes> {
39    inner: Arc<PayloadBuilderHandle<T>>,
40}
41
42impl<T> PayloadStore<T>
43where
44    T: PayloadTypes,
45{
46    /// Resolves the payload job and returns the best payload that has been built so far.
47    ///
48    /// Note: depending on the installed [`PayloadJobGenerator`], this may or may not terminate the
49    /// job, See [`PayloadJob::resolve`].
50    pub async fn resolve_kind(
51        &self,
52        id: PayloadId,
53        kind: PayloadKind,
54    ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
55        self.inner.resolve_kind(id, kind).await
56    }
57
58    /// Resolves the payload job and returns the best payload that has been built so far.
59    pub async fn resolve(
60        &self,
61        id: PayloadId,
62    ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
63        self.resolve_kind(id, PayloadKind::Earliest).await
64    }
65
66    /// Returns the best payload for the given identifier.
67    ///
68    /// Note: this merely returns the best payload so far and does not resolve the job.
69    pub async fn best_payload(
70        &self,
71        id: PayloadId,
72    ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
73        self.inner.best_payload(id).await
74    }
75
76    /// Returns the payload attributes associated with the given identifier.
77    ///
78    /// Note: this returns the attributes of the payload and does not resolve the job.
79    pub async fn payload_attributes(
80        &self,
81        id: PayloadId,
82    ) -> Option<Result<T::PayloadBuilderAttributes, PayloadBuilderError>> {
83        self.inner.payload_attributes(id).await
84    }
85}
86
87impl<T> PayloadStore<T>
88where
89    T: PayloadTypes,
90{
91    /// Create a new instance
92    pub fn new(inner: PayloadBuilderHandle<T>) -> Self {
93        Self { inner: Arc::new(inner) }
94    }
95}
96
97impl<T> From<PayloadBuilderHandle<T>> for PayloadStore<T>
98where
99    T: PayloadTypes,
100{
101    fn from(inner: PayloadBuilderHandle<T>) -> Self {
102        Self::new(inner)
103    }
104}
105
106/// A communication channel to the [`PayloadBuilderService`].
107///
108/// This is the API used to create new payloads and to get the current state of existing ones.
109#[derive(Debug)]
110pub struct PayloadBuilderHandle<T: PayloadTypes> {
111    /// Sender half of the message channel to the [`PayloadBuilderService`].
112    to_service: mpsc::UnboundedSender<PayloadServiceCommand<T>>,
113}
114
115impl<T: PayloadTypes> PayloadBuilderHandle<T> {
116    /// Creates a new payload builder handle for the given channel.
117    ///
118    /// Note: this is only used internally by the [`PayloadBuilderService`] to manage the payload
119    /// building flow See [`PayloadBuilderService::poll`] for implementation details.
120    pub const fn new(to_service: mpsc::UnboundedSender<PayloadServiceCommand<T>>) -> Self {
121        Self { to_service }
122    }
123
124    /// Sends a message to the service to start building a new payload for the given payload.
125    ///
126    /// Returns a receiver that will receive the payload id.
127    pub fn send_new_payload(
128        &self,
129        attr: T::PayloadBuilderAttributes,
130    ) -> Receiver<Result<PayloadId, PayloadBuilderError>> {
131        let (tx, rx) = oneshot::channel();
132        let _ = self.to_service.send(PayloadServiceCommand::BuildNewPayload(attr, tx));
133        rx
134    }
135
136    /// Returns the best payload for the given identifier.
137    /// Note: this does not resolve the job if it's still in progress.
138    pub async fn best_payload(
139        &self,
140        id: PayloadId,
141    ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
142        let (tx, rx) = oneshot::channel();
143        self.to_service.send(PayloadServiceCommand::BestPayload(id, tx)).ok()?;
144        rx.await.ok()?
145    }
146
147    /// Resolves the payload job and returns the best payload that has been built so far.
148    pub async fn resolve_kind(
149        &self,
150        id: PayloadId,
151        kind: PayloadKind,
152    ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
153        let (tx, rx) = oneshot::channel();
154        self.to_service.send(PayloadServiceCommand::Resolve(id, kind, tx)).ok()?;
155        match rx.await.transpose()? {
156            Ok(fut) => Some(fut.await),
157            Err(e) => Some(Err(e.into())),
158        }
159    }
160
161    /// Sends a message to the service to subscribe to payload events.
162    /// Returns a receiver that will receive them.
163    pub async fn subscribe(&self) -> Result<PayloadEvents<T>, PayloadBuilderError> {
164        let (tx, rx) = oneshot::channel();
165        let _ = self.to_service.send(PayloadServiceCommand::Subscribe(tx));
166        Ok(PayloadEvents { receiver: rx.await? })
167    }
168
169    /// Returns the payload attributes associated with the given identifier.
170    ///
171    /// Note: this returns the attributes of the payload and does not resolve the job.
172    pub async fn payload_attributes(
173        &self,
174        id: PayloadId,
175    ) -> Option<Result<T::PayloadBuilderAttributes, PayloadBuilderError>> {
176        let (tx, rx) = oneshot::channel();
177        self.to_service.send(PayloadServiceCommand::PayloadAttributes(id, tx)).ok()?;
178        rx.await.ok()?
179    }
180}
181
182impl<T> Clone for PayloadBuilderHandle<T>
183where
184    T: PayloadTypes,
185{
186    fn clone(&self) -> Self {
187        Self { to_service: self.to_service.clone() }
188    }
189}
190
191/// A service that manages payload building tasks.
192///
193/// This type is an endless future that manages the building of payloads.
194///
195/// It tracks active payloads and their build jobs that run in a worker pool.
196///
197/// By design, this type relies entirely on the [`PayloadJobGenerator`] to create new payloads and
198/// does know nothing about how to build them, it just drives their jobs to completion.
199#[derive(Debug)]
200#[must_use = "futures do nothing unless you `.await` or poll them"]
201pub struct PayloadBuilderService<Gen, St, T>
202where
203    T: PayloadTypes,
204    Gen: PayloadJobGenerator,
205    Gen::Job: PayloadJob<PayloadAttributes = T::PayloadBuilderAttributes>,
206{
207    /// The type that knows how to create new payloads.
208    generator: Gen,
209    /// All active payload jobs.
210    payload_jobs: Vec<(Gen::Job, PayloadId)>,
211    /// Copy of the sender half, so new [`PayloadBuilderHandle`] can be created on demand.
212    service_tx: mpsc::UnboundedSender<PayloadServiceCommand<T>>,
213    /// Receiver half of the command channel.
214    command_rx: UnboundedReceiverStream<PayloadServiceCommand<T>>,
215    /// Metrics for the payload builder service
216    metrics: PayloadBuilderServiceMetrics,
217    /// Chain events notification stream
218    chain_events: St,
219    /// Payload events handler, used to broadcast and subscribe to payload events.
220    payload_events: broadcast::Sender<Events<T>>,
221}
222
223const PAYLOAD_EVENTS_BUFFER_SIZE: usize = 20;
224
225// === impl PayloadBuilderService ===
226
227impl<Gen, St, T> PayloadBuilderService<Gen, St, T>
228where
229    T: PayloadTypes,
230    Gen: PayloadJobGenerator,
231    Gen::Job: PayloadJob<PayloadAttributes = T::PayloadBuilderAttributes>,
232    <Gen::Job as PayloadJob>::BuiltPayload: Into<T::BuiltPayload>,
233{
234    /// Creates a new payload builder service and returns the [`PayloadBuilderHandle`] to interact
235    /// with it.
236    ///
237    /// This also takes a stream of chain events that will be forwarded to the generator to apply
238    /// additional logic when new state is committed. See also
239    /// [`PayloadJobGenerator::on_new_state`].
240    pub fn new(generator: Gen, chain_events: St) -> (Self, PayloadBuilderHandle<T>) {
241        let (service_tx, command_rx) = mpsc::unbounded_channel();
242        let (payload_events, _) = broadcast::channel(PAYLOAD_EVENTS_BUFFER_SIZE);
243
244        let service = Self {
245            generator,
246            payload_jobs: Vec::new(),
247            service_tx,
248            command_rx: UnboundedReceiverStream::new(command_rx),
249            metrics: Default::default(),
250            chain_events,
251            payload_events,
252        };
253
254        let handle = service.handle();
255        (service, handle)
256    }
257
258    /// Returns a handle to the service.
259    pub fn handle(&self) -> PayloadBuilderHandle<T> {
260        PayloadBuilderHandle::new(self.service_tx.clone())
261    }
262
263    /// Returns true if the given payload is currently being built.
264    fn contains_payload(&self, id: PayloadId) -> bool {
265        self.payload_jobs.iter().any(|(_, job_id)| *job_id == id)
266    }
267
268    /// Returns the best payload for the given identifier that has been built so far.
269    fn best_payload(&self, id: PayloadId) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
270        let res = self
271            .payload_jobs
272            .iter()
273            .find(|(_, job_id)| *job_id == id)
274            .map(|(j, _)| j.best_payload().map(|p| p.into()));
275        if let Some(Ok(ref best)) = res {
276            self.metrics.set_best_revenue(best.block().number(), f64::from(best.fees()));
277        }
278
279        res
280    }
281
282    /// Returns the best payload for the given identifier that has been built so far and terminates
283    /// the job if requested.
284    fn resolve(
285        &mut self,
286        id: PayloadId,
287        kind: PayloadKind,
288    ) -> Option<PayloadFuture<T::BuiltPayload>> {
289        debug!(target: "payload_builder", %id, "resolving payload job");
290
291        let job = self.payload_jobs.iter().position(|(_, job_id)| *job_id == id)?;
292        let (fut, keep_alive) = self.payload_jobs[job].0.resolve_kind(kind);
293
294        if keep_alive == KeepPayloadJobAlive::No {
295            let (_, id) = self.payload_jobs.swap_remove(job);
296            debug!(target: "payload_builder", %id, "terminated resolved job");
297        }
298
299        // Since the fees will not be known until the payload future is resolved / awaited, we wrap
300        // the future in a new future that will update the metrics.
301        let resolved_metrics = self.metrics.clone();
302        let payload_events = self.payload_events.clone();
303
304        let fut = async move {
305            let res = fut.await;
306            if let Ok(payload) = &res {
307                if payload_events.receiver_count() > 0 {
308                    payload_events.send(Events::BuiltPayload(payload.clone().into())).ok();
309                }
310
311                resolved_metrics
312                    .set_resolved_revenue(payload.block().number(), f64::from(payload.fees()));
313            }
314            res.map(|p| p.into())
315        };
316
317        Some(Box::pin(fut))
318    }
319}
320
321impl<Gen, St, T> PayloadBuilderService<Gen, St, T>
322where
323    T: PayloadTypes,
324    Gen: PayloadJobGenerator,
325    Gen::Job: PayloadJob<PayloadAttributes = T::PayloadBuilderAttributes>,
326    <Gen::Job as PayloadJob>::BuiltPayload: Into<T::BuiltPayload>,
327{
328    /// Returns the payload attributes for the given payload.
329    fn payload_attributes(
330        &self,
331        id: PayloadId,
332    ) -> Option<Result<<Gen::Job as PayloadJob>::PayloadAttributes, PayloadBuilderError>> {
333        let attributes = self
334            .payload_jobs
335            .iter()
336            .find(|(_, job_id)| *job_id == id)
337            .map(|(j, _)| j.payload_attributes());
338
339        if attributes.is_none() {
340            trace!(%id, "no matching payload job found to get attributes for");
341        }
342
343        attributes
344    }
345}
346
347impl<Gen, St, T, N> Future for PayloadBuilderService<Gen, St, T>
348where
349    T: PayloadTypes,
350    N: NodePrimitives,
351    Gen: PayloadJobGenerator + Unpin + 'static,
352    <Gen as PayloadJobGenerator>::Job: Unpin + 'static,
353    St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
354    Gen::Job: PayloadJob<PayloadAttributes = T::PayloadBuilderAttributes>,
355    <Gen::Job as PayloadJob>::BuiltPayload: Into<T::BuiltPayload>,
356{
357    type Output = ();
358
359    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
360        let this = self.get_mut();
361        loop {
362            // notify the generator of new chain events
363            while let Poll::Ready(Some(new_head)) = this.chain_events.poll_next_unpin(cx) {
364                this.generator.on_new_state(new_head);
365            }
366
367            // we poll all jobs first, so we always have the latest payload that we can report if
368            // requests
369            // we don't care about the order of the jobs, so we can just swap_remove them
370            for idx in (0..this.payload_jobs.len()).rev() {
371                let (mut job, id) = this.payload_jobs.swap_remove(idx);
372
373                // drain better payloads from the job
374                match job.poll_unpin(cx) {
375                    Poll::Ready(Ok(_)) => {
376                        this.metrics.set_active_jobs(this.payload_jobs.len());
377                        trace!(%id, "payload job finished");
378                    }
379                    Poll::Ready(Err(err)) => {
380                        warn!(%err, ?id, "Payload builder job failed; resolving payload");
381                        this.metrics.inc_failed_jobs();
382                        this.metrics.set_active_jobs(this.payload_jobs.len());
383                    }
384                    Poll::Pending => {
385                        // still pending, put it back
386                        this.payload_jobs.push((job, id));
387                    }
388                }
389            }
390
391            // marker for exit condition
392            let mut new_job = false;
393
394            // drain all requests
395            while let Poll::Ready(Some(cmd)) = this.command_rx.poll_next_unpin(cx) {
396                match cmd {
397                    PayloadServiceCommand::BuildNewPayload(attr, tx) => {
398                        let id = attr.payload_id();
399                        let mut res = Ok(id);
400
401                        if this.contains_payload(id) {
402                            debug!(%id, parent = %attr.parent(), "Payload job already in progress, ignoring.");
403                        } else {
404                            // no job for this payload yet, create one
405                            let parent = attr.parent();
406                            match this.generator.new_payload_job(attr.clone()) {
407                                Ok(job) => {
408                                    info!(%id, %parent, "New payload job created");
409                                    this.metrics.inc_initiated_jobs();
410                                    new_job = true;
411                                    this.payload_jobs.push((job, id));
412                                    this.payload_events.send(Events::Attributes(attr.clone())).ok();
413                                }
414                                Err(err) => {
415                                    this.metrics.inc_failed_jobs();
416                                    warn!(%err, %id, "Failed to create payload builder job");
417                                    res = Err(err);
418                                }
419                            }
420                        }
421
422                        // return the id of the payload
423                        let _ = tx.send(res);
424                    }
425                    PayloadServiceCommand::BestPayload(id, tx) => {
426                        let _ = tx.send(this.best_payload(id));
427                    }
428                    PayloadServiceCommand::PayloadAttributes(id, tx) => {
429                        let attributes = this.payload_attributes(id);
430                        let _ = tx.send(attributes);
431                    }
432                    PayloadServiceCommand::Resolve(id, strategy, tx) => {
433                        let _ = tx.send(this.resolve(id, strategy));
434                    }
435                    PayloadServiceCommand::Subscribe(tx) => {
436                        let new_rx = this.payload_events.subscribe();
437                        let _ = tx.send(new_rx);
438                    }
439                }
440            }
441
442            if !new_job {
443                return Poll::Pending
444            }
445        }
446    }
447}
448
449/// Message type for the [`PayloadBuilderService`].
450pub enum PayloadServiceCommand<T: PayloadTypes> {
451    /// Start building a new payload.
452    BuildNewPayload(
453        T::PayloadBuilderAttributes,
454        oneshot::Sender<Result<PayloadId, PayloadBuilderError>>,
455    ),
456    /// Get the best payload so far
457    BestPayload(PayloadId, oneshot::Sender<Option<Result<T::BuiltPayload, PayloadBuilderError>>>),
458    /// Get the payload attributes for the given payload
459    PayloadAttributes(
460        PayloadId,
461        oneshot::Sender<Option<Result<T::PayloadBuilderAttributes, PayloadBuilderError>>>,
462    ),
463    /// Resolve the payload and return the payload
464    Resolve(
465        PayloadId,
466        /* kind: */ PayloadKind,
467        oneshot::Sender<Option<PayloadFuture<T::BuiltPayload>>>,
468    ),
469    /// Payload service events
470    Subscribe(oneshot::Sender<broadcast::Receiver<Events<T>>>),
471}
472
473impl<T> fmt::Debug for PayloadServiceCommand<T>
474where
475    T: PayloadTypes,
476{
477    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
478        match self {
479            Self::BuildNewPayload(f0, f1) => {
480                f.debug_tuple("BuildNewPayload").field(&f0).field(&f1).finish()
481            }
482            Self::BestPayload(f0, f1) => {
483                f.debug_tuple("BestPayload").field(&f0).field(&f1).finish()
484            }
485            Self::PayloadAttributes(f0, f1) => {
486                f.debug_tuple("PayloadAttributes").field(&f0).field(&f1).finish()
487            }
488            Self::Resolve(f0, f1, _f2) => f.debug_tuple("Resolve").field(&f0).field(&f1).finish(),
489            Self::Subscribe(f0) => f.debug_tuple("Subscribe").field(&f0).finish(),
490        }
491    }
492}