Skip to main content

reth_payload_builder/
service.rs

1//! Support for building payloads.
2//!
3//! The payload builder is responsible for building payloads.
4//! Once a new payload is created, it is continuously updated.
5
6use crate::{
7    metrics::PayloadBuilderServiceMetrics, traits::PayloadJobGenerator, KeepPayloadJobAlive,
8    PayloadJob,
9};
10use alloy_consensus::BlockHeader;
11use alloy_primitives::{BlockTimestamp, B256};
12use alloy_rpc_types::engine::PayloadId;
13use futures_util::{future::FutureExt, Stream, StreamExt};
14use reth_chain_state::CanonStateNotification;
15use reth_execution_cache::SavedCache;
16use reth_payload_builder_primitives::{Events, PayloadBuilderError, PayloadEvents};
17use reth_payload_primitives::{BuiltPayload, PayloadAttributes, PayloadKind, PayloadTypes};
18use reth_primitives_traits::{FastInstant as Instant, NodePrimitives};
19use reth_trie_parallel::state_root_task::StateRootHandle;
20use std::{
21    future::Future,
22    pin::Pin,
23    sync::Arc,
24    task::{Context, Poll},
25};
26use tokio::sync::{
27    broadcast, mpsc,
28    oneshot::{self, Receiver},
29    watch,
30};
31use tokio_stream::wrappers::UnboundedReceiverStream;
32use tracing::{debug, debug_span, info, trace, warn, Span};
33
34type PayloadFuture<P> = Pin<Box<dyn Future<Output = Result<P, PayloadBuilderError>> + Send>>;
35
36/// A communication channel to the [`PayloadBuilderService`] that can retrieve payloads.
37///
38/// This type is intended to be used to retrieve payloads from the service (e.g. from the engine
39/// API).
40#[derive(Debug)]
41pub struct PayloadStore<T: PayloadTypes> {
42    inner: Arc<PayloadBuilderHandle<T>>,
43}
44
45impl<T> PayloadStore<T>
46where
47    T: PayloadTypes,
48{
49    /// Resolves the payload job and returns the best payload that has been built so far.
50    ///
51    /// Note: depending on the installed [`PayloadJobGenerator`], this may or may not terminate the
52    /// job, See [`PayloadJob::resolve`].
53    pub async fn resolve_kind(
54        &self,
55        id: PayloadId,
56        kind: PayloadKind,
57    ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
58        self.inner.resolve_kind(id, kind).await
59    }
60
61    /// Resolves the payload job and returns the best payload that has been built so far.
62    pub async fn resolve(
63        &self,
64        id: PayloadId,
65    ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
66        self.resolve_kind(id, PayloadKind::Earliest).await
67    }
68
69    /// Returns the best payload for the given identifier.
70    ///
71    /// Note: this merely returns the best payload so far and does not resolve the job.
72    pub async fn best_payload(
73        &self,
74        id: PayloadId,
75    ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
76        self.inner.best_payload(id).await
77    }
78
79    /// Returns the payload timestamp associated with the given identifier.
80    ///
81    /// Note: this returns the timestamp of the payload and does not resolve the job.
82    pub async fn payload_timestamp(
83        &self,
84        id: PayloadId,
85    ) -> Option<Result<u64, PayloadBuilderError>> {
86        self.inner.payload_timestamp(id).await
87    }
88
89    /// Create a new instance
90    pub fn new(inner: PayloadBuilderHandle<T>) -> Self {
91        Self { inner: Arc::new(inner) }
92    }
93}
94
95impl<T> From<PayloadBuilderHandle<T>> for PayloadStore<T>
96where
97    T: PayloadTypes,
98{
99    fn from(inner: PayloadBuilderHandle<T>) -> Self {
100        Self::new(inner)
101    }
102}
103
104/// A communication channel to the [`PayloadBuilderService`].
105///
106/// This is the API used to create new payloads and to get the current state of existing ones.
107#[derive(Debug)]
108pub struct PayloadBuilderHandle<T: PayloadTypes> {
109    /// Sender half of the message channel to the [`PayloadBuilderService`].
110    to_service: mpsc::UnboundedSender<PayloadServiceCommand<T>>,
111}
112
113impl<T: PayloadTypes> PayloadBuilderHandle<T> {
114    /// Creates a new payload builder handle for the given channel.
115    ///
116    /// Note: this is only used internally by the [`PayloadBuilderService`] to manage the payload
117    /// building flow See [`PayloadBuilderService::poll`] for implementation details.
118    pub const fn new(to_service: mpsc::UnboundedSender<PayloadServiceCommand<T>>) -> Self {
119        Self { to_service }
120    }
121
122    /// Sends a message to the service to start building a new payload for the given payload.
123    ///
124    /// Returns a receiver that will receive the payload id.
125    pub fn send_new_payload(
126        &self,
127        input: BuildNewPayload<T::PayloadAttributes>,
128    ) -> Receiver<Result<PayloadId, PayloadBuilderError>> {
129        let (tx, rx) = oneshot::channel();
130        let span = debug_span!(parent: Span::current(), "payload_job");
131        let _ =
132            self.to_service.send(PayloadServiceCommand::BuildNewPayload(input.into(), span, tx));
133        rx
134    }
135
136    /// Returns the best payload for the given identifier.
137    /// Note: this does not resolve the job if it's still in progress.
138    pub async fn best_payload(
139        &self,
140        id: PayloadId,
141    ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
142        let (tx, rx) = oneshot::channel();
143        self.to_service.send(PayloadServiceCommand::BestPayload(id, tx)).ok()?;
144        rx.await.ok()?
145    }
146
147    /// Resolves the payload job and returns the best payload that has been built so far.
148    pub async fn resolve_kind(
149        &self,
150        id: PayloadId,
151        kind: PayloadKind,
152    ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
153        let (tx, rx) = oneshot::channel();
154        self.to_service.send(PayloadServiceCommand::Resolve(id, kind, tx)).ok()?;
155        match rx.await.transpose()? {
156            Ok(fut) => Some(fut.await),
157            Err(e) => Some(Err(e.into())),
158        }
159    }
160
161    /// Same as [`Self::resolve_kind`] but returns the underlying future.
162    pub async fn resolve_kind_fut(
163        &self,
164        id: PayloadId,
165        kind: PayloadKind,
166    ) -> Result<Option<PayloadFuture<T::BuiltPayload>>, PayloadBuilderError> {
167        let (tx, rx) = oneshot::channel();
168        self.to_service.send(PayloadServiceCommand::Resolve(id, kind, tx))?;
169        rx.await.map_err(Into::into)
170    }
171
172    /// Sends a message to the service to subscribe to payload events.
173    /// Returns a receiver that will receive them.
174    pub async fn subscribe(&self) -> Result<PayloadEvents<T>, PayloadBuilderError> {
175        let (tx, rx) = oneshot::channel();
176        let _ = self.to_service.send(PayloadServiceCommand::Subscribe(tx));
177        Ok(PayloadEvents { receiver: rx.await? })
178    }
179
180    /// Returns the payload timestamp associated with the given identifier.
181    ///
182    /// Note: this returns the timestamp of the payload and does not resolve the job.
183    pub async fn payload_timestamp(
184        &self,
185        id: PayloadId,
186    ) -> Option<Result<u64, PayloadBuilderError>> {
187        let (tx, rx) = oneshot::channel();
188        self.to_service.send(PayloadServiceCommand::PayloadTimestamp(id, tx)).ok()?;
189        rx.await.ok()?
190    }
191}
192
193impl<T> Clone for PayloadBuilderHandle<T>
194where
195    T: PayloadTypes,
196{
197    fn clone(&self) -> Self {
198        Self { to_service: self.to_service.clone() }
199    }
200}
201
202/// A service that manages payload building tasks.
203///
204/// This type is an endless future that manages the building of payloads.
205///
206/// It tracks active payloads and their build jobs that run in a worker pool.
207///
208/// By design, this type relies entirely on the [`PayloadJobGenerator`] to create new payloads and
209/// does know nothing about how to build them, it just drives their jobs to completion.
210#[derive(Debug)]
211#[must_use = "futures do nothing unless you `.await` or poll them"]
212pub struct PayloadBuilderService<Gen, St, T>
213where
214    T: PayloadTypes,
215    Gen: PayloadJobGenerator,
216    Gen::Job: PayloadJob<PayloadAttributes = T::PayloadAttributes>,
217{
218    /// The type that knows how to create new payloads.
219    generator: Gen,
220    /// All active payload jobs, each accompanied by its id and the caller's tracing span
221    /// propagated across the channel so that poll and resolve work appears as children of the
222    /// original Engine API request.
223    payload_jobs: Vec<(Gen::Job, PayloadId, Span)>,
224    /// Copy of the sender half, so new [`PayloadBuilderHandle`] can be created on demand.
225    service_tx: mpsc::UnboundedSender<PayloadServiceCommand<T>>,
226    /// Receiver half of the command channel.
227    command_rx: UnboundedReceiverStream<PayloadServiceCommand<T>>,
228    /// Metrics for the payload builder service
229    metrics: PayloadBuilderServiceMetrics,
230    /// Chain events notification stream
231    chain_events: St,
232    /// Payload events handler, used to broadcast and subscribe to payload events.
233    payload_events: broadcast::Sender<Events<T>>,
234    /// We retain latest resolved payload just to make sure that we can handle repeating
235    /// requests for it gracefully.
236    cached_payload_rx: watch::Receiver<Option<(PayloadId, BlockTimestamp, T::BuiltPayload)>>,
237    /// Sender half of the cached payload channel.
238    cached_payload_tx: watch::Sender<Option<(PayloadId, BlockTimestamp, T::BuiltPayload)>>,
239}
240
241const PAYLOAD_EVENTS_BUFFER_SIZE: usize = 20;
242
243// === impl PayloadBuilderService ===
244
245impl<Gen, St, T> PayloadBuilderService<Gen, St, T>
246where
247    T: PayloadTypes,
248    Gen: PayloadJobGenerator,
249    Gen::Job: PayloadJob<PayloadAttributes = T::PayloadAttributes>,
250    <Gen::Job as PayloadJob>::BuiltPayload: Into<T::BuiltPayload>,
251{
252    /// Creates a new payload builder service and returns the [`PayloadBuilderHandle`] to interact
253    /// with it.
254    ///
255    /// This also takes a stream of chain events that will be forwarded to the generator to apply
256    /// additional logic when new state is committed. See also
257    /// [`PayloadJobGenerator::on_new_state`].
258    pub fn new(generator: Gen, chain_events: St) -> (Self, PayloadBuilderHandle<T>) {
259        let (service_tx, command_rx) = mpsc::unbounded_channel();
260        let (payload_events, _) = broadcast::channel(PAYLOAD_EVENTS_BUFFER_SIZE);
261
262        let (cached_payload_tx, cached_payload_rx) = watch::channel(None);
263
264        let service = Self {
265            generator,
266            payload_jobs: Vec::new(),
267            service_tx,
268            command_rx: UnboundedReceiverStream::new(command_rx),
269            metrics: Default::default(),
270            chain_events,
271            payload_events,
272            cached_payload_rx,
273            cached_payload_tx,
274        };
275
276        let handle = service.handle();
277        (service, handle)
278    }
279
280    /// Returns a handle to the service.
281    pub fn handle(&self) -> PayloadBuilderHandle<T> {
282        PayloadBuilderHandle::new(self.service_tx.clone())
283    }
284
285    /// Create clone on `payload_events` sending handle that could be used by builder to produce
286    /// additional events during block building
287    pub fn payload_events_handle(&self) -> broadcast::Sender<Events<T>> {
288        self.payload_events.clone()
289    }
290
291    /// Returns true if the given payload is currently being built.
292    fn contains_payload(&self, id: PayloadId) -> bool {
293        self.payload_jobs.iter().any(|(_, job_id, _)| *job_id == id)
294    }
295
296    /// Returns the best payload for the given identifier that has been built so far.
297    fn best_payload(&self, id: PayloadId) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
298        let res = self
299            .payload_jobs
300            .iter()
301            .find(|(_, job_id, _)| *job_id == id)
302            .map(|(j, _, _)| j.best_payload().map(|p| p.into()));
303        if let Some(Ok(ref best)) = res {
304            self.metrics.set_best_revenue(best.block().number(), f64::from(best.fees()));
305        }
306
307        res
308    }
309
310    /// Returns the best payload for the given identifier that has been built so far and terminates
311    /// the job if requested.
312    fn resolve(
313        &mut self,
314        id: PayloadId,
315        kind: PayloadKind,
316    ) -> Option<PayloadFuture<T::BuiltPayload>> {
317        let start = Instant::now();
318        debug!(target: "payload_builder", %id, "resolving payload job");
319
320        if let Some((cached, _, payload)) = &*self.cached_payload_rx.borrow() &&
321            *cached == id
322        {
323            self.metrics.resolve_duration_seconds.record(start.elapsed());
324            return Some(Box::pin(core::future::ready(Ok(payload.clone()))));
325        }
326
327        let job = self.payload_jobs.iter().position(|(_, job_id, _)| *job_id == id)?;
328        let (fut, keep_alive) = self.payload_jobs[job].0.resolve_kind(kind);
329        let payload_timestamp = self.payload_jobs[job].0.payload_timestamp();
330
331        if keep_alive == KeepPayloadJobAlive::No {
332            let (_, id, _) = self.payload_jobs.swap_remove(job);
333            debug!(target: "payload_builder", %id, "terminated resolved job");
334        }
335
336        // Since the fees will not be known until the payload future is resolved / awaited, we wrap
337        // the future in a new future that will update the metrics.
338        let resolved_metrics = self.metrics.clone();
339        let payload_events = self.payload_events.clone();
340        let cached_payload_tx = self.cached_payload_tx.clone();
341
342        let fut = async move {
343            let res = fut.await;
344            resolved_metrics.resolve_duration_seconds.record(start.elapsed());
345            if let Ok(payload) = &res {
346                if payload_events.receiver_count() > 0 {
347                    payload_events.send(Events::BuiltPayload(payload.clone().into())).ok();
348                }
349
350                if let Ok(timestamp) = payload_timestamp {
351                    let _ = cached_payload_tx.send(Some((id, timestamp, payload.clone().into())));
352                }
353
354                resolved_metrics
355                    .set_resolved_revenue(payload.block().number(), f64::from(payload.fees()));
356            }
357            res.map(|p| p.into())
358        };
359
360        Some(Box::pin(fut))
361    }
362
363    /// Returns the payload timestamp for the given payload.
364    fn payload_timestamp(&self, id: PayloadId) -> Option<Result<u64, PayloadBuilderError>> {
365        if let Some((cached_id, timestamp, _)) = *self.cached_payload_rx.borrow() &&
366            cached_id == id
367        {
368            return Some(Ok(timestamp));
369        }
370
371        let timestamp = self
372            .payload_jobs
373            .iter()
374            .find(|(_, job_id, _)| *job_id == id)
375            .map(|(j, _, _)| j.payload_timestamp());
376
377        if timestamp.is_none() {
378            trace!(target: "payload_builder", %id, "no matching payload job found to get timestamp for");
379        }
380
381        timestamp
382    }
383}
384
385impl<Gen, St, T, N> Future for PayloadBuilderService<Gen, St, T>
386where
387    T: PayloadTypes,
388    N: NodePrimitives,
389    Gen: PayloadJobGenerator + Unpin + 'static,
390    <Gen as PayloadJobGenerator>::Job: Unpin + 'static,
391    St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
392    Gen::Job: PayloadJob<PayloadAttributes = T::PayloadAttributes>,
393    <Gen::Job as PayloadJob>::BuiltPayload: Into<T::BuiltPayload>,
394{
395    type Output = ();
396
397    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
398        let this = self.get_mut();
399        loop {
400            // notify the generator of new chain events
401            while let Poll::Ready(Some(new_head)) = this.chain_events.poll_next_unpin(cx) {
402                this.generator.on_new_state(new_head);
403            }
404
405            // we poll all jobs first, so we always have the latest payload that we can report if
406            // requests
407            // we don't care about the order of the jobs, so we can just swap_remove them
408            for idx in (0..this.payload_jobs.len()).rev() {
409                let (mut job, id, job_span) = this.payload_jobs.swap_remove(idx);
410
411                let poll_result = {
412                    let _entered = job_span.enter();
413                    job.poll_unpin(cx)
414                };
415
416                match poll_result {
417                    Poll::Ready(Ok(_)) => {
418                        this.metrics.set_active_jobs(this.payload_jobs.len());
419                        trace!(target: "payload_builder", %id, "payload job finished");
420                    }
421                    Poll::Ready(Err(err)) => {
422                        warn!(target: "payload_builder",%err, ?id, "Payload builder job failed; resolving payload");
423                        this.metrics.inc_failed_jobs();
424                        this.metrics.set_active_jobs(this.payload_jobs.len());
425                    }
426                    Poll::Pending => {
427                        this.payload_jobs.push((job, id, job_span));
428                    }
429                }
430            }
431
432            // marker for exit condition
433            let mut new_job = false;
434
435            // drain all requests
436            while let Poll::Ready(Some(cmd)) = this.command_rx.poll_next_unpin(cx) {
437                match cmd {
438                    PayloadServiceCommand::BuildNewPayload(input, job_span, tx) => {
439                        let id = input.payload_id();
440                        let mut res = Ok(id);
441                        let parent = input.parent_hash;
442
443                        if this.contains_payload(id) {
444                            debug!(target: "payload_builder", %id, %parent, "Payload job already in progress, ignoring.");
445                        } else {
446                            let start = Instant::now();
447                            let attributes = input.attributes.clone();
448                            let job_result = {
449                                let _entered = job_span.enter();
450                                this.generator.new_payload_job(*input, id)
451                            };
452
453                            match job_result {
454                                Ok(job) => {
455                                    this.metrics.new_job_duration_seconds.record(start.elapsed());
456                                    info!(target: "payload_builder", %id, %parent, "New payload job created");
457                                    this.metrics.inc_initiated_jobs();
458                                    new_job = true;
459                                    this.payload_jobs.push((job, id, job_span));
460                                    this.payload_events.send(Events::Attributes(attributes)).ok();
461
462                                    // Clear stale cached payload for this id so
463                                    // resolve() never returns an outdated result
464                                    // from a previous job with the same id.
465                                    if this
466                                        .cached_payload_rx
467                                        .borrow()
468                                        .as_ref()
469                                        .is_some_and(|(cached_id, _, _)| *cached_id == id)
470                                    {
471                                        trace!(target: "payload_builder", %id, "clearing stale cached payload for reused payload id");
472                                        let _ = this.cached_payload_tx.send(None);
473                                    }
474                                }
475                                Err(err) => {
476                                    this.metrics.new_job_duration_seconds.record(start.elapsed());
477                                    this.metrics.inc_failed_jobs();
478                                    warn!(target: "payload_builder", %err, %id, "Failed to create payload builder job");
479                                    res = Err(err);
480                                }
481                            }
482                        }
483
484                        let _ = tx.send(res);
485                    }
486                    PayloadServiceCommand::BestPayload(id, tx) => {
487                        let _ = tx.send(this.best_payload(id));
488                    }
489                    PayloadServiceCommand::PayloadTimestamp(id, tx) => {
490                        let timestamp = this.payload_timestamp(id);
491                        let _ = tx.send(timestamp);
492                    }
493                    PayloadServiceCommand::Resolve(id, strategy, tx) => {
494                        let _ = tx.send(this.resolve(id, strategy));
495                    }
496                    PayloadServiceCommand::Subscribe(tx) => {
497                        let new_rx = this.payload_events.subscribe();
498                        let _ = tx.send(new_rx);
499                    }
500                }
501            }
502
503            if !new_job {
504                return Poll::Pending
505            }
506        }
507    }
508}
509
510/// Message type for the [`PayloadBuilderService`].
511#[derive(derive_more::Debug)]
512pub enum PayloadServiceCommand<T: PayloadTypes> {
513    /// Start building a new payload.
514    ///
515    /// Carries the caller's [`Span`] so the service can parent payload-building work under the
516    /// originating Engine API trace.
517    BuildNewPayload(
518        Box<BuildNewPayload<T::PayloadAttributes>>,
519        Span,
520        oneshot::Sender<Result<PayloadId, PayloadBuilderError>>,
521    ),
522    /// Get the best payload so far
523    BestPayload(PayloadId, oneshot::Sender<Option<Result<T::BuiltPayload, PayloadBuilderError>>>),
524    /// Get the payload timestamp for the given payload
525    PayloadTimestamp(PayloadId, oneshot::Sender<Option<Result<u64, PayloadBuilderError>>>),
526    /// Resolve the payload and return the payload
527    Resolve(
528        PayloadId,
529        /* kind: */ PayloadKind,
530        #[debug(skip)] oneshot::Sender<Option<PayloadFuture<T::BuiltPayload>>>,
531    ),
532    /// Payload service events
533    Subscribe(oneshot::Sender<broadcast::Receiver<Events<T>>>),
534}
535
536/// A request to build a new payload.
537#[derive(Debug)]
538pub struct BuildNewPayload<T> {
539    /// The attributes for the new payload
540    pub attributes: T,
541    /// The parent hash of the new payload
542    pub parent_hash: B256,
543    /// Optional execution cache to use for the payload.
544    ///
545    /// Only provided if `--engine.share-execution-cache-with-payload-builder` is enabled.
546    pub cache: Option<SavedCache>,
547    /// Optional handle to a background sparse trie task.
548    pub trie_handle: Option<StateRootHandle>,
549}
550
551impl<T: PayloadAttributes> BuildNewPayload<T> {
552    /// Returns the payload id for the new payload.
553    pub fn payload_id(&self) -> PayloadId {
554        self.attributes.payload_id(&self.parent_hash)
555    }
556}