reth_payload_builder/
service.rs

1//! Support for building payloads.
2//!
3//! The payload builder is responsible for building payloads.
4//! Once a new payload is created, it is continuously updated.
5
6use crate::{
7    metrics::PayloadBuilderServiceMetrics, traits::PayloadJobGenerator, KeepPayloadJobAlive,
8    PayloadJob,
9};
10use alloy_consensus::BlockHeader;
11use alloy_primitives::BlockTimestamp;
12use alloy_rpc_types::engine::PayloadId;
13use futures_util::{future::FutureExt, Stream, StreamExt};
14use reth_chain_state::CanonStateNotification;
15use reth_payload_builder_primitives::{Events, PayloadBuilderError, PayloadEvents};
16use reth_payload_primitives::{BuiltPayload, PayloadBuilderAttributes, PayloadKind, PayloadTypes};
17use reth_primitives_traits::NodePrimitives;
18use std::{
19    fmt,
20    future::Future,
21    pin::Pin,
22    sync::Arc,
23    task::{Context, Poll},
24};
25use tokio::sync::{
26    broadcast, mpsc,
27    oneshot::{self, Receiver},
28    watch,
29};
30use tokio_stream::wrappers::UnboundedReceiverStream;
31use tracing::{debug, info, trace, warn};
32
33type PayloadFuture<P> = Pin<Box<dyn Future<Output = Result<P, PayloadBuilderError>> + Send>>;
34
35/// A communication channel to the [`PayloadBuilderService`] that can retrieve payloads.
36///
37/// This type is intended to be used to retrieve payloads from the service (e.g. from the engine
38/// API).
39#[derive(Debug)]
40pub struct PayloadStore<T: PayloadTypes> {
41    inner: Arc<PayloadBuilderHandle<T>>,
42}
43
44impl<T> PayloadStore<T>
45where
46    T: PayloadTypes,
47{
48    /// Resolves the payload job and returns the best payload that has been built so far.
49    ///
50    /// Note: depending on the installed [`PayloadJobGenerator`], this may or may not terminate the
51    /// job, See [`PayloadJob::resolve`].
52    pub async fn resolve_kind(
53        &self,
54        id: PayloadId,
55        kind: PayloadKind,
56    ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
57        self.inner.resolve_kind(id, kind).await
58    }
59
60    /// Resolves the payload job and returns the best payload that has been built so far.
61    pub async fn resolve(
62        &self,
63        id: PayloadId,
64    ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
65        self.resolve_kind(id, PayloadKind::Earliest).await
66    }
67
68    /// Returns the best payload for the given identifier.
69    ///
70    /// Note: this merely returns the best payload so far and does not resolve the job.
71    pub async fn best_payload(
72        &self,
73        id: PayloadId,
74    ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
75        self.inner.best_payload(id).await
76    }
77
78    /// Returns the payload timestamp associated with the given identifier.
79    ///
80    /// Note: this returns the timestamp of the payload and does not resolve the job.
81    pub async fn payload_timestamp(
82        &self,
83        id: PayloadId,
84    ) -> Option<Result<u64, PayloadBuilderError>> {
85        self.inner.payload_timestamp(id).await
86    }
87}
88
89impl<T> PayloadStore<T>
90where
91    T: PayloadTypes,
92{
93    /// Create a new instance
94    pub fn new(inner: PayloadBuilderHandle<T>) -> Self {
95        Self { inner: Arc::new(inner) }
96    }
97}
98
99impl<T> From<PayloadBuilderHandle<T>> for PayloadStore<T>
100where
101    T: PayloadTypes,
102{
103    fn from(inner: PayloadBuilderHandle<T>) -> Self {
104        Self::new(inner)
105    }
106}
107
108/// A communication channel to the [`PayloadBuilderService`].
109///
110/// This is the API used to create new payloads and to get the current state of existing ones.
111#[derive(Debug)]
112pub struct PayloadBuilderHandle<T: PayloadTypes> {
113    /// Sender half of the message channel to the [`PayloadBuilderService`].
114    to_service: mpsc::UnboundedSender<PayloadServiceCommand<T>>,
115}
116
117impl<T: PayloadTypes> PayloadBuilderHandle<T> {
118    /// Creates a new payload builder handle for the given channel.
119    ///
120    /// Note: this is only used internally by the [`PayloadBuilderService`] to manage the payload
121    /// building flow See [`PayloadBuilderService::poll`] for implementation details.
122    pub const fn new(to_service: mpsc::UnboundedSender<PayloadServiceCommand<T>>) -> Self {
123        Self { to_service }
124    }
125
126    /// Sends a message to the service to start building a new payload for the given payload.
127    ///
128    /// Returns a receiver that will receive the payload id.
129    pub fn send_new_payload(
130        &self,
131        attr: T::PayloadBuilderAttributes,
132    ) -> Receiver<Result<PayloadId, PayloadBuilderError>> {
133        let (tx, rx) = oneshot::channel();
134        let _ = self.to_service.send(PayloadServiceCommand::BuildNewPayload(attr, tx));
135        rx
136    }
137
138    /// Returns the best payload for the given identifier.
139    /// Note: this does not resolve the job if it's still in progress.
140    pub async fn best_payload(
141        &self,
142        id: PayloadId,
143    ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
144        let (tx, rx) = oneshot::channel();
145        self.to_service.send(PayloadServiceCommand::BestPayload(id, tx)).ok()?;
146        rx.await.ok()?
147    }
148
149    /// Resolves the payload job and returns the best payload that has been built so far.
150    pub async fn resolve_kind(
151        &self,
152        id: PayloadId,
153        kind: PayloadKind,
154    ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
155        let (tx, rx) = oneshot::channel();
156        self.to_service.send(PayloadServiceCommand::Resolve(id, kind, tx)).ok()?;
157        match rx.await.transpose()? {
158            Ok(fut) => Some(fut.await),
159            Err(e) => Some(Err(e.into())),
160        }
161    }
162
163    /// Sends a message to the service to subscribe to payload events.
164    /// Returns a receiver that will receive them.
165    pub async fn subscribe(&self) -> Result<PayloadEvents<T>, PayloadBuilderError> {
166        let (tx, rx) = oneshot::channel();
167        let _ = self.to_service.send(PayloadServiceCommand::Subscribe(tx));
168        Ok(PayloadEvents { receiver: rx.await? })
169    }
170
171    /// Returns the payload timestamp associated with the given identifier.
172    ///
173    /// Note: this returns the timestamp of the payload and does not resolve the job.
174    pub async fn payload_timestamp(
175        &self,
176        id: PayloadId,
177    ) -> Option<Result<u64, PayloadBuilderError>> {
178        let (tx, rx) = oneshot::channel();
179        self.to_service.send(PayloadServiceCommand::PayloadTimestamp(id, tx)).ok()?;
180        rx.await.ok()?
181    }
182}
183
184impl<T> Clone for PayloadBuilderHandle<T>
185where
186    T: PayloadTypes,
187{
188    fn clone(&self) -> Self {
189        Self { to_service: self.to_service.clone() }
190    }
191}
192
193/// A service that manages payload building tasks.
194///
195/// This type is an endless future that manages the building of payloads.
196///
197/// It tracks active payloads and their build jobs that run in a worker pool.
198///
199/// By design, this type relies entirely on the [`PayloadJobGenerator`] to create new payloads and
200/// does know nothing about how to build them, it just drives their jobs to completion.
201#[derive(Debug)]
202#[must_use = "futures do nothing unless you `.await` or poll them"]
203pub struct PayloadBuilderService<Gen, St, T>
204where
205    T: PayloadTypes,
206    Gen: PayloadJobGenerator,
207    Gen::Job: PayloadJob<PayloadAttributes = T::PayloadBuilderAttributes>,
208{
209    /// The type that knows how to create new payloads.
210    generator: Gen,
211    /// All active payload jobs.
212    payload_jobs: Vec<(Gen::Job, PayloadId)>,
213    /// Copy of the sender half, so new [`PayloadBuilderHandle`] can be created on demand.
214    service_tx: mpsc::UnboundedSender<PayloadServiceCommand<T>>,
215    /// Receiver half of the command channel.
216    command_rx: UnboundedReceiverStream<PayloadServiceCommand<T>>,
217    /// Metrics for the payload builder service
218    metrics: PayloadBuilderServiceMetrics,
219    /// Chain events notification stream
220    chain_events: St,
221    /// Payload events handler, used to broadcast and subscribe to payload events.
222    payload_events: broadcast::Sender<Events<T>>,
223    /// We retain latest resolved payload just to make sure that we can handle repeating
224    /// requests for it gracefully.
225    cached_payload_rx: watch::Receiver<Option<(PayloadId, BlockTimestamp, T::BuiltPayload)>>,
226    /// Sender half of the cached payload channel.
227    cached_payload_tx: watch::Sender<Option<(PayloadId, BlockTimestamp, T::BuiltPayload)>>,
228}
229
230const PAYLOAD_EVENTS_BUFFER_SIZE: usize = 20;
231
232// === impl PayloadBuilderService ===
233
234impl<Gen, St, T> PayloadBuilderService<Gen, St, T>
235where
236    T: PayloadTypes,
237    Gen: PayloadJobGenerator,
238    Gen::Job: PayloadJob<PayloadAttributes = T::PayloadBuilderAttributes>,
239    <Gen::Job as PayloadJob>::BuiltPayload: Into<T::BuiltPayload>,
240{
241    /// Creates a new payload builder service and returns the [`PayloadBuilderHandle`] to interact
242    /// with it.
243    ///
244    /// This also takes a stream of chain events that will be forwarded to the generator to apply
245    /// additional logic when new state is committed. See also
246    /// [`PayloadJobGenerator::on_new_state`].
247    pub fn new(generator: Gen, chain_events: St) -> (Self, PayloadBuilderHandle<T>) {
248        let (service_tx, command_rx) = mpsc::unbounded_channel();
249        let (payload_events, _) = broadcast::channel(PAYLOAD_EVENTS_BUFFER_SIZE);
250
251        let (cached_payload_tx, cached_payload_rx) = watch::channel(None);
252
253        let service = Self {
254            generator,
255            payload_jobs: Vec::new(),
256            service_tx,
257            command_rx: UnboundedReceiverStream::new(command_rx),
258            metrics: Default::default(),
259            chain_events,
260            payload_events,
261            cached_payload_rx,
262            cached_payload_tx,
263        };
264
265        let handle = service.handle();
266        (service, handle)
267    }
268
269    /// Returns a handle to the service.
270    pub fn handle(&self) -> PayloadBuilderHandle<T> {
271        PayloadBuilderHandle::new(self.service_tx.clone())
272    }
273
274    /// Create clone on `payload_events` sending handle that could be used by builder to produce
275    /// additional events during block building
276    pub fn payload_events_handle(&self) -> broadcast::Sender<Events<T>> {
277        self.payload_events.clone()
278    }
279
280    /// Returns true if the given payload is currently being built.
281    fn contains_payload(&self, id: PayloadId) -> bool {
282        self.payload_jobs.iter().any(|(_, job_id)| *job_id == id)
283    }
284
285    /// Returns the best payload for the given identifier that has been built so far.
286    fn best_payload(&self, id: PayloadId) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
287        let res = self
288            .payload_jobs
289            .iter()
290            .find(|(_, job_id)| *job_id == id)
291            .map(|(j, _)| j.best_payload().map(|p| p.into()));
292        if let Some(Ok(ref best)) = res {
293            self.metrics.set_best_revenue(best.block().number(), f64::from(best.fees()));
294        }
295
296        res
297    }
298
299    /// Returns the best payload for the given identifier that has been built so far and terminates
300    /// the job if requested.
301    fn resolve(
302        &mut self,
303        id: PayloadId,
304        kind: PayloadKind,
305    ) -> Option<PayloadFuture<T::BuiltPayload>> {
306        debug!(target: "payload_builder", %id, "resolving payload job");
307
308        if let Some((cached, _, payload)) = &*self.cached_payload_rx.borrow() {
309            if *cached == id {
310                return Some(Box::pin(core::future::ready(Ok(payload.clone()))));
311            }
312        }
313
314        let job = self.payload_jobs.iter().position(|(_, job_id)| *job_id == id)?;
315        let (fut, keep_alive) = self.payload_jobs[job].0.resolve_kind(kind);
316        let payload_timestamp = self.payload_jobs[job].0.payload_timestamp();
317
318        if keep_alive == KeepPayloadJobAlive::No {
319            let (_, id) = self.payload_jobs.swap_remove(job);
320            debug!(target: "payload_builder", %id, "terminated resolved job");
321        }
322
323        // Since the fees will not be known until the payload future is resolved / awaited, we wrap
324        // the future in a new future that will update the metrics.
325        let resolved_metrics = self.metrics.clone();
326        let payload_events = self.payload_events.clone();
327        let cached_payload_tx = self.cached_payload_tx.clone();
328
329        let fut = async move {
330            let res = fut.await;
331            if let Ok(payload) = &res {
332                if payload_events.receiver_count() > 0 {
333                    payload_events.send(Events::BuiltPayload(payload.clone().into())).ok();
334                }
335
336                if let Ok(timestamp) = payload_timestamp {
337                    let _ = cached_payload_tx.send(Some((id, timestamp, payload.clone().into())));
338                }
339
340                resolved_metrics
341                    .set_resolved_revenue(payload.block().number(), f64::from(payload.fees()));
342            }
343            res.map(|p| p.into())
344        };
345
346        Some(Box::pin(fut))
347    }
348}
349
350impl<Gen, St, T> PayloadBuilderService<Gen, St, T>
351where
352    T: PayloadTypes,
353    Gen: PayloadJobGenerator,
354    Gen::Job: PayloadJob<PayloadAttributes = T::PayloadBuilderAttributes>,
355    <Gen::Job as PayloadJob>::BuiltPayload: Into<T::BuiltPayload>,
356{
357    /// Returns the payload timestamp for the given payload.
358    fn payload_timestamp(&self, id: PayloadId) -> Option<Result<u64, PayloadBuilderError>> {
359        if let Some((cached_id, timestamp, _)) = *self.cached_payload_rx.borrow() {
360            if cached_id == id {
361                return Some(Ok(timestamp));
362            }
363        }
364
365        let timestamp = self
366            .payload_jobs
367            .iter()
368            .find(|(_, job_id)| *job_id == id)
369            .map(|(j, _)| j.payload_timestamp());
370
371        if timestamp.is_none() {
372            trace!(target: "payload_builder", %id, "no matching payload job found to get timestamp for");
373        }
374
375        timestamp
376    }
377}
378
379impl<Gen, St, T, N> Future for PayloadBuilderService<Gen, St, T>
380where
381    T: PayloadTypes,
382    N: NodePrimitives,
383    Gen: PayloadJobGenerator + Unpin + 'static,
384    <Gen as PayloadJobGenerator>::Job: Unpin + 'static,
385    St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
386    Gen::Job: PayloadJob<PayloadAttributes = T::PayloadBuilderAttributes>,
387    <Gen::Job as PayloadJob>::BuiltPayload: Into<T::BuiltPayload>,
388{
389    type Output = ();
390
391    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
392        let this = self.get_mut();
393        loop {
394            // notify the generator of new chain events
395            while let Poll::Ready(Some(new_head)) = this.chain_events.poll_next_unpin(cx) {
396                this.generator.on_new_state(new_head);
397            }
398
399            // we poll all jobs first, so we always have the latest payload that we can report if
400            // requests
401            // we don't care about the order of the jobs, so we can just swap_remove them
402            for idx in (0..this.payload_jobs.len()).rev() {
403                let (mut job, id) = this.payload_jobs.swap_remove(idx);
404
405                // drain better payloads from the job
406                match job.poll_unpin(cx) {
407                    Poll::Ready(Ok(_)) => {
408                        this.metrics.set_active_jobs(this.payload_jobs.len());
409                        trace!(target: "payload_builder", %id, "payload job finished");
410                    }
411                    Poll::Ready(Err(err)) => {
412                        warn!(target: "payload_builder",%err, ?id, "Payload builder job failed; resolving payload");
413                        this.metrics.inc_failed_jobs();
414                        this.metrics.set_active_jobs(this.payload_jobs.len());
415                    }
416                    Poll::Pending => {
417                        // still pending, put it back
418                        this.payload_jobs.push((job, id));
419                    }
420                }
421            }
422
423            // marker for exit condition
424            let mut new_job = false;
425
426            // drain all requests
427            while let Poll::Ready(Some(cmd)) = this.command_rx.poll_next_unpin(cx) {
428                match cmd {
429                    PayloadServiceCommand::BuildNewPayload(attr, tx) => {
430                        let id = attr.payload_id();
431                        let mut res = Ok(id);
432
433                        if this.contains_payload(id) {
434                            debug!(target: "payload_builder",%id, parent = %attr.parent(), "Payload job already in progress, ignoring.");
435                        } else {
436                            // no job for this payload yet, create one
437                            let parent = attr.parent();
438                            match this.generator.new_payload_job(attr.clone()) {
439                                Ok(job) => {
440                                    info!(target: "payload_builder", %id, %parent, "New payload job created");
441                                    this.metrics.inc_initiated_jobs();
442                                    new_job = true;
443                                    this.payload_jobs.push((job, id));
444                                    this.payload_events.send(Events::Attributes(attr.clone())).ok();
445                                }
446                                Err(err) => {
447                                    this.metrics.inc_failed_jobs();
448                                    warn!(target: "payload_builder", %err, %id, "Failed to create payload builder job");
449                                    res = Err(err);
450                                }
451                            }
452                        }
453
454                        // return the id of the payload
455                        let _ = tx.send(res);
456                    }
457                    PayloadServiceCommand::BestPayload(id, tx) => {
458                        let _ = tx.send(this.best_payload(id));
459                    }
460                    PayloadServiceCommand::PayloadTimestamp(id, tx) => {
461                        let timestamp = this.payload_timestamp(id);
462                        let _ = tx.send(timestamp);
463                    }
464                    PayloadServiceCommand::Resolve(id, strategy, tx) => {
465                        let _ = tx.send(this.resolve(id, strategy));
466                    }
467                    PayloadServiceCommand::Subscribe(tx) => {
468                        let new_rx = this.payload_events.subscribe();
469                        let _ = tx.send(new_rx);
470                    }
471                }
472            }
473
474            if !new_job {
475                return Poll::Pending
476            }
477        }
478    }
479}
480
481/// Message type for the [`PayloadBuilderService`].
482pub enum PayloadServiceCommand<T: PayloadTypes> {
483    /// Start building a new payload.
484    BuildNewPayload(
485        T::PayloadBuilderAttributes,
486        oneshot::Sender<Result<PayloadId, PayloadBuilderError>>,
487    ),
488    /// Get the best payload so far
489    BestPayload(PayloadId, oneshot::Sender<Option<Result<T::BuiltPayload, PayloadBuilderError>>>),
490    /// Get the payload timestamp for the given payload
491    PayloadTimestamp(PayloadId, oneshot::Sender<Option<Result<u64, PayloadBuilderError>>>),
492    /// Resolve the payload and return the payload
493    Resolve(
494        PayloadId,
495        /* kind: */ PayloadKind,
496        oneshot::Sender<Option<PayloadFuture<T::BuiltPayload>>>,
497    ),
498    /// Payload service events
499    Subscribe(oneshot::Sender<broadcast::Receiver<Events<T>>>),
500}
501
502impl<T> fmt::Debug for PayloadServiceCommand<T>
503where
504    T: PayloadTypes,
505{
506    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
507        match self {
508            Self::BuildNewPayload(f0, f1) => {
509                f.debug_tuple("BuildNewPayload").field(&f0).field(&f1).finish()
510            }
511            Self::BestPayload(f0, f1) => {
512                f.debug_tuple("BestPayload").field(&f0).field(&f1).finish()
513            }
514            Self::PayloadTimestamp(f0, f1) => {
515                f.debug_tuple("PayloadAttributes").field(&f0).field(&f1).finish()
516            }
517            Self::Resolve(f0, f1, _f2) => f.debug_tuple("Resolve").field(&f0).field(&f1).finish(),
518            Self::Subscribe(f0) => f.debug_tuple("Subscribe").field(&f0).finish(),
519        }
520    }
521}