reth_payload_builder/
service.rs

1//! Support for building payloads.
2//!
3//! The payload builder is responsible for building payloads.
4//! Once a new payload is created, it is continuously updated.
5
6use crate::{
7    metrics::PayloadBuilderServiceMetrics, traits::PayloadJobGenerator, KeepPayloadJobAlive,
8    PayloadJob,
9};
10use alloy_consensus::BlockHeader;
11use alloy_rpc_types::engine::PayloadId;
12use futures_util::{future::FutureExt, Stream, StreamExt};
13use reth_chain_state::CanonStateNotification;
14use reth_payload_builder_primitives::{Events, PayloadBuilderError, PayloadEvents};
15use reth_payload_primitives::{BuiltPayload, PayloadBuilderAttributes, PayloadKind, PayloadTypes};
16use reth_primitives_traits::NodePrimitives;
17use std::{
18    fmt,
19    future::Future,
20    pin::Pin,
21    sync::Arc,
22    task::{Context, Poll},
23};
24use tokio::sync::{
25    broadcast, mpsc,
26    oneshot::{self, Receiver},
27};
28use tokio_stream::wrappers::UnboundedReceiverStream;
29use tracing::{debug, info, trace, warn};
30
31type PayloadFuture<P> = Pin<Box<dyn Future<Output = Result<P, PayloadBuilderError>> + Send + Sync>>;
32
33/// A communication channel to the [`PayloadBuilderService`] that can retrieve payloads.
34///
35/// This type is intended to be used to retrieve payloads from the service (e.g. from the engine
36/// API).
37#[derive(Debug)]
38pub struct PayloadStore<T: PayloadTypes> {
39    inner: Arc<PayloadBuilderHandle<T>>,
40}
41
42impl<T> PayloadStore<T>
43where
44    T: PayloadTypes,
45{
46    /// Resolves the payload job and returns the best payload that has been built so far.
47    ///
48    /// Note: depending on the installed [`PayloadJobGenerator`], this may or may not terminate the
49    /// job, See [`PayloadJob::resolve`].
50    pub async fn resolve_kind(
51        &self,
52        id: PayloadId,
53        kind: PayloadKind,
54    ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
55        self.inner.resolve_kind(id, kind).await
56    }
57
58    /// Resolves the payload job and returns the best payload that has been built so far.
59    pub async fn resolve(
60        &self,
61        id: PayloadId,
62    ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
63        self.resolve_kind(id, PayloadKind::Earliest).await
64    }
65
66    /// Returns the best payload for the given identifier.
67    ///
68    /// Note: this merely returns the best payload so far and does not resolve the job.
69    pub async fn best_payload(
70        &self,
71        id: PayloadId,
72    ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
73        self.inner.best_payload(id).await
74    }
75
76    /// Returns the payload attributes associated with the given identifier.
77    ///
78    /// Note: this returns the attributes of the payload and does not resolve the job.
79    pub async fn payload_attributes(
80        &self,
81        id: PayloadId,
82    ) -> Option<Result<T::PayloadBuilderAttributes, PayloadBuilderError>> {
83        self.inner.payload_attributes(id).await
84    }
85}
86
87impl<T> PayloadStore<T>
88where
89    T: PayloadTypes,
90{
91    /// Create a new instance
92    pub fn new(inner: PayloadBuilderHandle<T>) -> Self {
93        Self { inner: Arc::new(inner) }
94    }
95}
96
97impl<T> From<PayloadBuilderHandle<T>> for PayloadStore<T>
98where
99    T: PayloadTypes,
100{
101    fn from(inner: PayloadBuilderHandle<T>) -> Self {
102        Self::new(inner)
103    }
104}
105
106/// A communication channel to the [`PayloadBuilderService`].
107///
108/// This is the API used to create new payloads and to get the current state of existing ones.
109#[derive(Debug)]
110pub struct PayloadBuilderHandle<T: PayloadTypes> {
111    /// Sender half of the message channel to the [`PayloadBuilderService`].
112    to_service: mpsc::UnboundedSender<PayloadServiceCommand<T>>,
113}
114
115impl<T: PayloadTypes> PayloadBuilderHandle<T> {
116    /// Creates a new payload builder handle for the given channel.
117    ///
118    /// Note: this is only used internally by the [`PayloadBuilderService`] to manage the payload
119    /// building flow See [`PayloadBuilderService::poll`] for implementation details.
120    pub const fn new(to_service: mpsc::UnboundedSender<PayloadServiceCommand<T>>) -> Self {
121        Self { to_service }
122    }
123
124    /// Sends a message to the service to start building a new payload for the given payload.
125    ///
126    /// Returns a receiver that will receive the payload id.
127    pub fn send_new_payload(
128        &self,
129        attr: T::PayloadBuilderAttributes,
130    ) -> Receiver<Result<PayloadId, PayloadBuilderError>> {
131        let (tx, rx) = oneshot::channel();
132        let _ = self.to_service.send(PayloadServiceCommand::BuildNewPayload(attr, tx));
133        rx
134    }
135
136    /// Returns the best payload for the given identifier.
137    /// Note: this does not resolve the job if it's still in progress.
138    pub async fn best_payload(
139        &self,
140        id: PayloadId,
141    ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
142        let (tx, rx) = oneshot::channel();
143        self.to_service.send(PayloadServiceCommand::BestPayload(id, tx)).ok()?;
144        rx.await.ok()?
145    }
146
147    /// Resolves the payload job and returns the best payload that has been built so far.
148    pub async fn resolve_kind(
149        &self,
150        id: PayloadId,
151        kind: PayloadKind,
152    ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
153        let (tx, rx) = oneshot::channel();
154        self.to_service.send(PayloadServiceCommand::Resolve(id, kind, tx)).ok()?;
155        match rx.await.transpose()? {
156            Ok(fut) => Some(fut.await),
157            Err(e) => Some(Err(e.into())),
158        }
159    }
160
161    /// Sends a message to the service to subscribe to payload events.
162    /// Returns a receiver that will receive them.
163    pub async fn subscribe(&self) -> Result<PayloadEvents<T>, PayloadBuilderError> {
164        let (tx, rx) = oneshot::channel();
165        let _ = self.to_service.send(PayloadServiceCommand::Subscribe(tx));
166        Ok(PayloadEvents { receiver: rx.await? })
167    }
168
169    /// Returns the payload attributes associated with the given identifier.
170    ///
171    /// Note: this returns the attributes of the payload and does not resolve the job.
172    pub async fn payload_attributes(
173        &self,
174        id: PayloadId,
175    ) -> Option<Result<T::PayloadBuilderAttributes, PayloadBuilderError>> {
176        let (tx, rx) = oneshot::channel();
177        self.to_service.send(PayloadServiceCommand::PayloadAttributes(id, tx)).ok()?;
178        rx.await.ok()?
179    }
180}
181
182impl<T> Clone for PayloadBuilderHandle<T>
183where
184    T: PayloadTypes,
185{
186    fn clone(&self) -> Self {
187        Self { to_service: self.to_service.clone() }
188    }
189}
190
191/// A service that manages payload building tasks.
192///
193/// This type is an endless future that manages the building of payloads.
194///
195/// It tracks active payloads and their build jobs that run in a worker pool.
196///
197/// By design, this type relies entirely on the [`PayloadJobGenerator`] to create new payloads and
198/// does know nothing about how to build them, it just drives their jobs to completion.
199#[derive(Debug)]
200#[must_use = "futures do nothing unless you `.await` or poll them"]
201pub struct PayloadBuilderService<Gen, St, T>
202where
203    T: PayloadTypes,
204    Gen: PayloadJobGenerator,
205    Gen::Job: PayloadJob<PayloadAttributes = T::PayloadBuilderAttributes>,
206{
207    /// The type that knows how to create new payloads.
208    generator: Gen,
209    /// All active payload jobs.
210    payload_jobs: Vec<(Gen::Job, PayloadId)>,
211    /// Copy of the sender half, so new [`PayloadBuilderHandle`] can be created on demand.
212    service_tx: mpsc::UnboundedSender<PayloadServiceCommand<T>>,
213    /// Receiver half of the command channel.
214    command_rx: UnboundedReceiverStream<PayloadServiceCommand<T>>,
215    /// Metrics for the payload builder service
216    metrics: PayloadBuilderServiceMetrics,
217    /// Chain events notification stream
218    chain_events: St,
219    /// Payload events handler, used to broadcast and subscribe to payload events.
220    payload_events: broadcast::Sender<Events<T>>,
221}
222
223const PAYLOAD_EVENTS_BUFFER_SIZE: usize = 20;
224
225// === impl PayloadBuilderService ===
226
227impl<Gen, St, T> PayloadBuilderService<Gen, St, T>
228where
229    T: PayloadTypes,
230    Gen: PayloadJobGenerator,
231    Gen::Job: PayloadJob<PayloadAttributes = T::PayloadBuilderAttributes>,
232    <Gen::Job as PayloadJob>::BuiltPayload: Into<T::BuiltPayload>,
233{
234    /// Creates a new payload builder service and returns the [`PayloadBuilderHandle`] to interact
235    /// with it.
236    ///
237    /// This also takes a stream of chain events that will be forwarded to the generator to apply
238    /// additional logic when new state is committed. See also
239    /// [`PayloadJobGenerator::on_new_state`].
240    pub fn new(generator: Gen, chain_events: St) -> (Self, PayloadBuilderHandle<T>) {
241        let (service_tx, command_rx) = mpsc::unbounded_channel();
242        let (payload_events, _) = broadcast::channel(PAYLOAD_EVENTS_BUFFER_SIZE);
243
244        let service = Self {
245            generator,
246            payload_jobs: Vec::new(),
247            service_tx,
248            command_rx: UnboundedReceiverStream::new(command_rx),
249            metrics: Default::default(),
250            chain_events,
251            payload_events,
252        };
253
254        let handle = service.handle();
255        (service, handle)
256    }
257
258    /// Returns a handle to the service.
259    pub fn handle(&self) -> PayloadBuilderHandle<T> {
260        PayloadBuilderHandle::new(self.service_tx.clone())
261    }
262
263    /// Returns true if the given payload is currently being built.
264    fn contains_payload(&self, id: PayloadId) -> bool {
265        self.payload_jobs.iter().any(|(_, job_id)| *job_id == id)
266    }
267
268    /// Returns the best payload for the given identifier that has been built so far.
269    fn best_payload(&self, id: PayloadId) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
270        let res = self
271            .payload_jobs
272            .iter()
273            .find(|(_, job_id)| *job_id == id)
274            .map(|(j, _)| j.best_payload().map(|p| p.into()));
275        if let Some(Ok(ref best)) = res {
276            self.metrics.set_best_revenue(best.block().number(), f64::from(best.fees()));
277        }
278
279        res
280    }
281
282    /// Returns the best payload for the given identifier that has been built so far and terminates
283    /// the job if requested.
284    fn resolve(
285        &mut self,
286        id: PayloadId,
287        kind: PayloadKind,
288    ) -> Option<PayloadFuture<T::BuiltPayload>> {
289        trace!(%id, "resolving payload job");
290
291        let job = self.payload_jobs.iter().position(|(_, job_id)| *job_id == id)?;
292        let (fut, keep_alive) = self.payload_jobs[job].0.resolve_kind(kind);
293
294        if keep_alive == KeepPayloadJobAlive::No {
295            let (_, id) = self.payload_jobs.swap_remove(job);
296            trace!(%id, "terminated resolved job");
297        }
298
299        // Since the fees will not be known until the payload future is resolved / awaited, we wrap
300        // the future in a new future that will update the metrics.
301        let resolved_metrics = self.metrics.clone();
302        let payload_events = self.payload_events.clone();
303
304        let fut = async move {
305            let res = fut.await;
306            if let Ok(ref payload) = res {
307                payload_events.send(Events::BuiltPayload(payload.clone().into())).ok();
308
309                resolved_metrics
310                    .set_resolved_revenue(payload.block().number(), f64::from(payload.fees()));
311            }
312            res.map(|p| p.into())
313        };
314
315        Some(Box::pin(fut))
316    }
317}
318
319impl<Gen, St, T> PayloadBuilderService<Gen, St, T>
320where
321    T: PayloadTypes,
322    Gen: PayloadJobGenerator,
323    Gen::Job: PayloadJob<PayloadAttributes = T::PayloadBuilderAttributes>,
324    <Gen::Job as PayloadJob>::BuiltPayload: Into<T::BuiltPayload>,
325{
326    /// Returns the payload attributes for the given payload.
327    fn payload_attributes(
328        &self,
329        id: PayloadId,
330    ) -> Option<Result<<Gen::Job as PayloadJob>::PayloadAttributes, PayloadBuilderError>> {
331        let attributes = self
332            .payload_jobs
333            .iter()
334            .find(|(_, job_id)| *job_id == id)
335            .map(|(j, _)| j.payload_attributes());
336
337        if attributes.is_none() {
338            trace!(%id, "no matching payload job found to get attributes for");
339        }
340
341        attributes
342    }
343}
344
345impl<Gen, St, T, N> Future for PayloadBuilderService<Gen, St, T>
346where
347    T: PayloadTypes,
348    N: NodePrimitives,
349    Gen: PayloadJobGenerator + Unpin + 'static,
350    <Gen as PayloadJobGenerator>::Job: Unpin + 'static,
351    St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
352    Gen::Job: PayloadJob<PayloadAttributes = T::PayloadBuilderAttributes>,
353    <Gen::Job as PayloadJob>::BuiltPayload: Into<T::BuiltPayload>,
354{
355    type Output = ();
356
357    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
358        let this = self.get_mut();
359        loop {
360            // notify the generator of new chain events
361            while let Poll::Ready(Some(new_head)) = this.chain_events.poll_next_unpin(cx) {
362                this.generator.on_new_state(new_head);
363            }
364
365            // we poll all jobs first, so we always have the latest payload that we can report if
366            // requests
367            // we don't care about the order of the jobs, so we can just swap_remove them
368            for idx in (0..this.payload_jobs.len()).rev() {
369                let (mut job, id) = this.payload_jobs.swap_remove(idx);
370
371                // drain better payloads from the job
372                match job.poll_unpin(cx) {
373                    Poll::Ready(Ok(_)) => {
374                        this.metrics.set_active_jobs(this.payload_jobs.len());
375                        trace!(%id, "payload job finished");
376                    }
377                    Poll::Ready(Err(err)) => {
378                        warn!(%err, ?id, "Payload builder job failed; resolving payload");
379                        this.metrics.inc_failed_jobs();
380                        this.metrics.set_active_jobs(this.payload_jobs.len());
381                    }
382                    Poll::Pending => {
383                        // still pending, put it back
384                        this.payload_jobs.push((job, id));
385                    }
386                }
387            }
388
389            // marker for exit condition
390            let mut new_job = false;
391
392            // drain all requests
393            while let Poll::Ready(Some(cmd)) = this.command_rx.poll_next_unpin(cx) {
394                match cmd {
395                    PayloadServiceCommand::BuildNewPayload(attr, tx) => {
396                        let id = attr.payload_id();
397                        let mut res = Ok(id);
398
399                        if this.contains_payload(id) {
400                            debug!(%id, parent = %attr.parent(), "Payload job already in progress, ignoring.");
401                        } else {
402                            // no job for this payload yet, create one
403                            let parent = attr.parent();
404                            match this.generator.new_payload_job(attr.clone()) {
405                                Ok(job) => {
406                                    info!(%id, %parent, "New payload job created");
407                                    this.metrics.inc_initiated_jobs();
408                                    new_job = true;
409                                    this.payload_jobs.push((job, id));
410                                    this.payload_events.send(Events::Attributes(attr.clone())).ok();
411                                }
412                                Err(err) => {
413                                    this.metrics.inc_failed_jobs();
414                                    warn!(%err, %id, "Failed to create payload builder job");
415                                    res = Err(err);
416                                }
417                            }
418                        }
419
420                        // return the id of the payload
421                        let _ = tx.send(res);
422                    }
423                    PayloadServiceCommand::BestPayload(id, tx) => {
424                        let _ = tx.send(this.best_payload(id));
425                    }
426                    PayloadServiceCommand::PayloadAttributes(id, tx) => {
427                        let attributes = this.payload_attributes(id);
428                        let _ = tx.send(attributes);
429                    }
430                    PayloadServiceCommand::Resolve(id, strategy, tx) => {
431                        let _ = tx.send(this.resolve(id, strategy));
432                    }
433                    PayloadServiceCommand::Subscribe(tx) => {
434                        let new_rx = this.payload_events.subscribe();
435                        let _ = tx.send(new_rx);
436                    }
437                }
438            }
439
440            if !new_job {
441                return Poll::Pending
442            }
443        }
444    }
445}
446
447/// Message type for the [`PayloadBuilderService`].
448pub enum PayloadServiceCommand<T: PayloadTypes> {
449    /// Start building a new payload.
450    BuildNewPayload(
451        T::PayloadBuilderAttributes,
452        oneshot::Sender<Result<PayloadId, PayloadBuilderError>>,
453    ),
454    /// Get the best payload so far
455    BestPayload(PayloadId, oneshot::Sender<Option<Result<T::BuiltPayload, PayloadBuilderError>>>),
456    /// Get the payload attributes for the given payload
457    PayloadAttributes(
458        PayloadId,
459        oneshot::Sender<Option<Result<T::PayloadBuilderAttributes, PayloadBuilderError>>>,
460    ),
461    /// Resolve the payload and return the payload
462    Resolve(
463        PayloadId,
464        /* kind: */ PayloadKind,
465        oneshot::Sender<Option<PayloadFuture<T::BuiltPayload>>>,
466    ),
467    /// Payload service events
468    Subscribe(oneshot::Sender<broadcast::Receiver<Events<T>>>),
469}
470
471impl<T> fmt::Debug for PayloadServiceCommand<T>
472where
473    T: PayloadTypes,
474{
475    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
476        match self {
477            Self::BuildNewPayload(f0, f1) => {
478                f.debug_tuple("BuildNewPayload").field(&f0).field(&f1).finish()
479            }
480            Self::BestPayload(f0, f1) => {
481                f.debug_tuple("BestPayload").field(&f0).field(&f1).finish()
482            }
483            Self::PayloadAttributes(f0, f1) => {
484                f.debug_tuple("PayloadAttributes").field(&f0).field(&f1).finish()
485            }
486            Self::Resolve(f0, f1, _f2) => f.debug_tuple("Resolve").field(&f0).field(&f1).finish(),
487            Self::Subscribe(f0) => f.debug_tuple("Subscribe").field(&f0).finish(),
488        }
489    }
490}