Skip to main content

reth_payload_builder/
service.rs

1//! Support for building payloads.
2//!
3//! The payload builder is responsible for building payloads.
4//! Once a new payload is created, it is continuously updated.
5
6use crate::{
7    metrics::PayloadBuilderServiceMetrics, traits::PayloadJobGenerator, KeepPayloadJobAlive,
8    PayloadJob,
9};
10use alloy_consensus::BlockHeader;
11use alloy_primitives::{BlockTimestamp, B256};
12use alloy_rpc_types::engine::PayloadId;
13use futures_util::{future::FutureExt, Stream, StreamExt};
14use reth_chain_state::CanonStateNotification;
15use reth_execution_cache::SavedCache;
16use reth_payload_builder_primitives::{Events, PayloadBuilderError, PayloadEvents};
17use reth_payload_primitives::{BuiltPayload, PayloadAttributes, PayloadKind, PayloadTypes};
18use reth_primitives_traits::{FastInstant as Instant, NodePrimitives};
19use reth_trie_parallel::state_root_task::StateRootHandle;
20use std::{
21    future::Future,
22    pin::Pin,
23    sync::Arc,
24    task::{Context, Poll},
25};
26use tokio::sync::{
27    broadcast, mpsc,
28    oneshot::{self, Receiver},
29    watch,
30};
31use tokio_stream::wrappers::UnboundedReceiverStream;
32use tracing::{debug, debug_span, info, trace, warn, Span};
33
34type PayloadFuture<P> = Pin<Box<dyn Future<Output = Result<P, PayloadBuilderError>> + Send>>;
35type PayloadJobEntry<Job> = (Job, PayloadId, Span);
36type ResolvePayloadResult<P, Job> = (Option<PayloadFuture<P>>, Option<PayloadJobEntry<Job>>);
37
38/// A communication channel to the [`PayloadBuilderService`] that can retrieve payloads.
39///
40/// This type is intended to be used to retrieve payloads from the service (e.g. from the engine
41/// API).
42#[derive(Debug)]
43pub struct PayloadStore<T: PayloadTypes> {
44    inner: Arc<PayloadBuilderHandle<T>>,
45}
46
47impl<T> PayloadStore<T>
48where
49    T: PayloadTypes,
50{
51    /// Resolves the payload job and returns the best payload that has been built so far.
52    ///
53    /// Note: depending on the installed [`PayloadJobGenerator`], this may or may not terminate the
54    /// job, See [`PayloadJob::resolve`].
55    pub fn resolve_kind(
56        &self,
57        id: PayloadId,
58        kind: PayloadKind,
59    ) -> impl Future<Output = Option<Result<T::BuiltPayload, PayloadBuilderError>>> {
60        self.inner.resolve_kind(id, kind)
61    }
62
63    /// Resolves the payload job and returns the best payload that has been built so far.
64    pub async fn resolve(
65        &self,
66        id: PayloadId,
67    ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
68        self.resolve_kind(id, PayloadKind::Earliest).await
69    }
70
71    /// Returns the best payload for the given identifier.
72    ///
73    /// Note: this merely returns the best payload so far and does not resolve the job.
74    pub async fn best_payload(
75        &self,
76        id: PayloadId,
77    ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
78        self.inner.best_payload(id).await
79    }
80
81    /// Returns the payload timestamp associated with the given identifier.
82    ///
83    /// Note: this returns the timestamp of the payload and does not resolve the job.
84    pub async fn payload_timestamp(
85        &self,
86        id: PayloadId,
87    ) -> Option<Result<u64, PayloadBuilderError>> {
88        self.inner.payload_timestamp(id).await
89    }
90
91    /// Create a new instance
92    pub fn new(inner: PayloadBuilderHandle<T>) -> Self {
93        Self { inner: Arc::new(inner) }
94    }
95}
96
97impl<T> From<PayloadBuilderHandle<T>> for PayloadStore<T>
98where
99    T: PayloadTypes,
100{
101    fn from(inner: PayloadBuilderHandle<T>) -> Self {
102        Self::new(inner)
103    }
104}
105
106/// A communication channel to the [`PayloadBuilderService`].
107///
108/// This is the API used to create new payloads and to get the current state of existing ones.
109#[derive(Debug)]
110pub struct PayloadBuilderHandle<T: PayloadTypes> {
111    /// Sender half of the message channel to the [`PayloadBuilderService`].
112    to_service: mpsc::UnboundedSender<PayloadServiceCommand<T>>,
113}
114
115impl<T: PayloadTypes> PayloadBuilderHandle<T> {
116    /// Creates a new payload builder handle for the given channel.
117    ///
118    /// Note: this is only used internally by the [`PayloadBuilderService`] to manage the payload
119    /// building flow See [`PayloadBuilderService::poll`] for implementation details.
120    pub const fn new(to_service: mpsc::UnboundedSender<PayloadServiceCommand<T>>) -> Self {
121        Self { to_service }
122    }
123
124    /// Sends a message to the service to start building a new payload for the given payload.
125    ///
126    /// Returns a receiver that will receive the payload id.
127    pub fn send_new_payload(
128        &self,
129        input: BuildNewPayload<T::PayloadAttributes>,
130    ) -> Receiver<Result<PayloadId, PayloadBuilderError>> {
131        let (tx, rx) = oneshot::channel();
132        let span = debug_span!(parent: Span::current(), "payload_job");
133        let _ =
134            self.to_service.send(PayloadServiceCommand::BuildNewPayload(input.into(), span, tx));
135        rx
136    }
137
138    /// Returns the best payload for the given identifier.
139    /// Note: this does not resolve the job if it's still in progress.
140    pub async fn best_payload(
141        &self,
142        id: PayloadId,
143    ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
144        let (tx, rx) = oneshot::channel();
145        self.to_service.send(PayloadServiceCommand::BestPayload(id, tx)).ok()?;
146        rx.await.ok()?
147    }
148
149    /// Resolves the payload job and returns the best payload that has been built so far.
150    ///
151    /// # Cancellation safety
152    ///
153    /// The future returned by this method is not cancellation-safe. This method sends the resolve
154    /// command before returning the future, so dropping the returned future drops the response
155    /// receiver and cancels the job identified by `id`.
156    pub fn resolve_kind(
157        &self,
158        id: PayloadId,
159        kind: PayloadKind,
160    ) -> impl Future<Output = Option<Result<T::BuiltPayload, PayloadBuilderError>>> {
161        let (tx, rx) = oneshot::channel();
162        let sent = self.to_service.send(PayloadServiceCommand::Resolve(id, kind, tx)).is_ok();
163        async move {
164            if !sent {
165                return None
166            }
167
168            match rx.await.transpose()? {
169                Ok(fut) => Some(fut.await),
170                Err(e) => Some(Err(e.into())),
171            }
172        }
173    }
174
175    /// Sends a message to the service to subscribe to payload events.
176    /// Returns a receiver that will receive them.
177    pub async fn subscribe(&self) -> Result<PayloadEvents<T>, PayloadBuilderError> {
178        let (tx, rx) = oneshot::channel();
179        let _ = self.to_service.send(PayloadServiceCommand::Subscribe(tx));
180        Ok(PayloadEvents { receiver: rx.await? })
181    }
182
183    /// Returns the payload timestamp associated with the given identifier.
184    ///
185    /// Note: this returns the timestamp of the payload and does not resolve the job.
186    pub async fn payload_timestamp(
187        &self,
188        id: PayloadId,
189    ) -> Option<Result<u64, PayloadBuilderError>> {
190        let (tx, rx) = oneshot::channel();
191        self.to_service.send(PayloadServiceCommand::PayloadTimestamp(id, tx)).ok()?;
192        rx.await.ok()?
193    }
194}
195
196impl<T> Clone for PayloadBuilderHandle<T>
197where
198    T: PayloadTypes,
199{
200    fn clone(&self) -> Self {
201        Self { to_service: self.to_service.clone() }
202    }
203}
204
205/// A service that manages payload building tasks.
206///
207/// This type is an endless future that manages the building of payloads.
208///
209/// It tracks active payloads and their build jobs that run in a worker pool.
210///
211/// By design, this type relies entirely on the [`PayloadJobGenerator`] to create new payloads and
212/// does know nothing about how to build them, it just drives their jobs to completion.
213#[derive(Debug)]
214#[must_use = "futures do nothing unless you `.await` or poll them"]
215pub struct PayloadBuilderService<Gen, St, T>
216where
217    T: PayloadTypes,
218    Gen: PayloadJobGenerator,
219    Gen::Job: PayloadJob<PayloadAttributes = T::PayloadAttributes>,
220{
221    /// The type that knows how to create new payloads.
222    generator: Gen,
223    /// All active payload jobs, each accompanied by its id and the caller's tracing span
224    /// propagated across the channel so that poll and resolve work appears as children of the
225    /// original Engine API request.
226    payload_jobs: Vec<(Gen::Job, PayloadId, Span)>,
227    /// Copy of the sender half, so new [`PayloadBuilderHandle`] can be created on demand.
228    service_tx: mpsc::UnboundedSender<PayloadServiceCommand<T>>,
229    /// Receiver half of the command channel.
230    command_rx: UnboundedReceiverStream<PayloadServiceCommand<T>>,
231    /// Metrics for the payload builder service
232    metrics: PayloadBuilderServiceMetrics,
233    /// Chain events notification stream
234    chain_events: St,
235    /// Payload events handler, used to broadcast and subscribe to payload events.
236    payload_events: broadcast::Sender<Events<T>>,
237    /// We retain latest resolved payload just to make sure that we can handle repeating
238    /// requests for it gracefully.
239    cached_payload_rx: watch::Receiver<Option<(PayloadId, BlockTimestamp, T::BuiltPayload)>>,
240    /// Sender half of the cached payload channel.
241    cached_payload_tx: watch::Sender<Option<(PayloadId, BlockTimestamp, T::BuiltPayload)>>,
242}
243
244const PAYLOAD_EVENTS_BUFFER_SIZE: usize = 20;
245
246// === impl PayloadBuilderService ===
247
248impl<Gen, St, T> PayloadBuilderService<Gen, St, T>
249where
250    T: PayloadTypes,
251    Gen: PayloadJobGenerator,
252    Gen::Job: PayloadJob<PayloadAttributes = T::PayloadAttributes>,
253    <Gen::Job as PayloadJob>::BuiltPayload: Into<T::BuiltPayload>,
254{
255    /// Creates a new payload builder service and returns the [`PayloadBuilderHandle`] to interact
256    /// with it.
257    ///
258    /// This also takes a stream of chain events that will be forwarded to the generator to apply
259    /// additional logic when new state is committed. See also
260    /// [`PayloadJobGenerator::on_new_state`].
261    pub fn new(generator: Gen, chain_events: St) -> (Self, PayloadBuilderHandle<T>) {
262        let (service_tx, command_rx) = mpsc::unbounded_channel();
263        let (payload_events, _) = broadcast::channel(PAYLOAD_EVENTS_BUFFER_SIZE);
264
265        let (cached_payload_tx, cached_payload_rx) = watch::channel(None);
266
267        let service = Self {
268            generator,
269            payload_jobs: Vec::new(),
270            service_tx,
271            command_rx: UnboundedReceiverStream::new(command_rx),
272            metrics: Default::default(),
273            chain_events,
274            payload_events,
275            cached_payload_rx,
276            cached_payload_tx,
277        };
278
279        let handle = service.handle();
280        (service, handle)
281    }
282
283    /// Returns a handle to the service.
284    pub fn handle(&self) -> PayloadBuilderHandle<T> {
285        PayloadBuilderHandle::new(self.service_tx.clone())
286    }
287
288    /// Create clone on `payload_events` sending handle that could be used by builder to produce
289    /// additional events during block building
290    pub fn payload_events_handle(&self) -> broadcast::Sender<Events<T>> {
291        self.payload_events.clone()
292    }
293
294    /// Returns true if the given payload is currently being built.
295    fn contains_payload(&self, id: PayloadId) -> bool {
296        self.payload_jobs.iter().any(|(_, job_id, _)| *job_id == id)
297    }
298
299    /// Returns the best payload for the given identifier that has been built so far.
300    fn best_payload(&self, id: PayloadId) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
301        let res = self
302            .payload_jobs
303            .iter()
304            .find(|(_, job_id, _)| *job_id == id)
305            .map(|(j, _, _)| j.best_payload().map(|p| p.into()));
306        if let Some(Ok(ref best)) = res {
307            self.metrics.set_best_revenue(best.block().number(), f64::from(best.fees()));
308        }
309
310        res
311    }
312
313    /// Returns the best payload for the given identifier that has been built so far.
314    ///
315    /// If the job should be terminated, this removes it from active polling and returns it so the
316    /// caller can drop it after the response is sent.
317    fn resolve(
318        &mut self,
319        id: PayloadId,
320        kind: PayloadKind,
321    ) -> ResolvePayloadResult<T::BuiltPayload, Gen::Job> {
322        let start = Instant::now();
323        debug!(target: "payload_builder", %id, "resolving payload job");
324
325        if let Some((cached, _, payload)) = &*self.cached_payload_rx.borrow() &&
326            *cached == id
327        {
328            self.metrics.resolve_duration_seconds.record(start.elapsed());
329            return (Some(Box::pin(core::future::ready(Ok(payload.clone())))), None);
330        }
331
332        let Some(job) = self.payload_jobs.iter().position(|(_, job_id, _)| *job_id == id) else {
333            return (None, None)
334        };
335        let (fut, keep_alive) = self.payload_jobs[job].0.resolve_kind(kind);
336        let payload_timestamp = self.payload_jobs[job].0.payload_timestamp();
337
338        let resolved_job =
339            (keep_alive == KeepPayloadJobAlive::No).then(|| self.payload_jobs.swap_remove(job));
340
341        // Since the fees will not be known until the payload future is resolved / awaited, we wrap
342        // the future in a new future that will update the metrics.
343        let resolved_metrics = self.metrics.clone();
344        let payload_events = self.payload_events.clone();
345        let cached_payload_tx = self.cached_payload_tx.clone();
346
347        let fut = async move {
348            let res = fut.await;
349            resolved_metrics.resolve_duration_seconds.record(start.elapsed());
350            if let Ok(payload) = &res {
351                if payload_events.receiver_count() > 0 {
352                    payload_events.send(Events::BuiltPayload(payload.clone().into())).ok();
353                }
354
355                if let Ok(timestamp) = payload_timestamp {
356                    let _ = cached_payload_tx.send(Some((id, timestamp, payload.clone().into())));
357                }
358
359                resolved_metrics
360                    .set_resolved_revenue(payload.block().number(), f64::from(payload.fees()));
361            }
362            res.map(|p| p.into())
363        };
364
365        (Some(Box::pin(fut)), resolved_job)
366    }
367
368    /// Returns the payload timestamp for the given payload.
369    fn payload_timestamp(&self, id: PayloadId) -> Option<Result<u64, PayloadBuilderError>> {
370        if let Some((cached_id, timestamp, _)) = *self.cached_payload_rx.borrow() &&
371            cached_id == id
372        {
373            return Some(Ok(timestamp));
374        }
375
376        let timestamp = self
377            .payload_jobs
378            .iter()
379            .find(|(_, job_id, _)| *job_id == id)
380            .map(|(j, _, _)| j.payload_timestamp());
381
382        if timestamp.is_none() {
383            trace!(target: "payload_builder", %id, "no matching payload job found to get timestamp for");
384        }
385
386        timestamp
387    }
388}
389
390impl<Gen, St, T, N> Future for PayloadBuilderService<Gen, St, T>
391where
392    T: PayloadTypes,
393    N: NodePrimitives,
394    Gen: PayloadJobGenerator + Unpin + 'static,
395    <Gen as PayloadJobGenerator>::Job: Unpin + 'static,
396    St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
397    Gen::Job: PayloadJob<PayloadAttributes = T::PayloadAttributes>,
398    <Gen::Job as PayloadJob>::BuiltPayload: Into<T::BuiltPayload>,
399{
400    type Output = ();
401
402    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
403        let this = self.get_mut();
404        loop {
405            // notify the generator of new chain events
406            while let Poll::Ready(Some(new_head)) = this.chain_events.poll_next_unpin(cx) {
407                this.generator.on_new_state(new_head);
408            }
409
410            // we poll all jobs first, so we always have the latest payload that we can report if
411            // requests
412            // we don't care about the order of the jobs, so we can just swap_remove them
413            for idx in (0..this.payload_jobs.len()).rev() {
414                let (mut job, id, job_span) = this.payload_jobs.swap_remove(idx);
415
416                let poll_result = {
417                    let _entered = job_span.enter();
418                    job.poll_unpin(cx)
419                };
420
421                match poll_result {
422                    Poll::Ready(Ok(_)) => {
423                        this.metrics.set_active_jobs(this.payload_jobs.len());
424                        trace!(target: "payload_builder", %id, "payload job finished");
425                    }
426                    Poll::Ready(Err(err)) => {
427                        warn!(target: "payload_builder",%err, ?id, "Payload builder job failed; resolving payload");
428                        this.metrics.inc_failed_jobs();
429                        this.metrics.set_active_jobs(this.payload_jobs.len());
430                    }
431                    Poll::Pending => {
432                        this.payload_jobs.push((job, id, job_span));
433                    }
434                }
435            }
436
437            // marker for exit condition
438            let mut new_job = false;
439
440            // drain all requests
441            while let Poll::Ready(Some(cmd)) = this.command_rx.poll_next_unpin(cx) {
442                match cmd {
443                    PayloadServiceCommand::BuildNewPayload(input, job_span, tx) => {
444                        let id = input.payload_id();
445                        let mut res = Ok(id);
446                        let parent = input.parent_hash;
447
448                        if this.contains_payload(id) {
449                            debug!(target: "payload_builder", %id, %parent, "Payload job already in progress, ignoring.");
450                        } else {
451                            let start = Instant::now();
452                            let attributes = input.attributes.clone();
453                            let job_result = {
454                                let _entered = job_span.enter();
455                                this.generator.new_payload_job(*input, id)
456                            };
457
458                            match job_result {
459                                Ok(job) => {
460                                    this.metrics.new_job_duration_seconds.record(start.elapsed());
461                                    info!(target: "payload_builder", %id, %parent, "New payload job created");
462                                    this.metrics.inc_initiated_jobs();
463                                    new_job = true;
464                                    this.payload_jobs.push((job, id, job_span));
465                                    this.payload_events.send(Events::Attributes(attributes)).ok();
466
467                                    // Clear stale cached payload for this id so
468                                    // resolve() never returns an outdated result
469                                    // from a previous job with the same id.
470                                    if this
471                                        .cached_payload_rx
472                                        .borrow()
473                                        .as_ref()
474                                        .is_some_and(|(cached_id, _, _)| *cached_id == id)
475                                    {
476                                        trace!(target: "payload_builder", %id, "clearing stale cached payload for reused payload id");
477                                        let _ = this.cached_payload_tx.send(None);
478                                    }
479                                }
480                                Err(err) => {
481                                    this.metrics.new_job_duration_seconds.record(start.elapsed());
482                                    this.metrics.inc_failed_jobs();
483                                    warn!(target: "payload_builder", %err, %id, "Failed to create payload builder job");
484                                    res = Err(err);
485                                }
486                            }
487                        }
488
489                        let _ = tx.send(res);
490                    }
491                    PayloadServiceCommand::BestPayload(id, tx) => {
492                        let _ = tx.send(this.best_payload(id));
493                    }
494                    PayloadServiceCommand::PayloadTimestamp(id, tx) => {
495                        let timestamp = this.payload_timestamp(id);
496                        let _ = tx.send(timestamp);
497                    }
498                    PayloadServiceCommand::Resolve(id, strategy, tx) => {
499                        let (payload_fut, resolved_job) = this.resolve(id, strategy);
500                        let _ = tx.send(payload_fut);
501
502                        if let Some((_job, id, _job_span)) = resolved_job {
503                            debug!(target: "payload_builder", %id, "terminated resolved job");
504                        }
505                    }
506                    PayloadServiceCommand::Subscribe(tx) => {
507                        let new_rx = this.payload_events.subscribe();
508                        let _ = tx.send(new_rx);
509                    }
510                }
511            }
512
513            if !new_job {
514                return Poll::Pending
515            }
516        }
517    }
518}
519
520/// Message type for the [`PayloadBuilderService`].
521#[derive(derive_more::Debug)]
522pub enum PayloadServiceCommand<T: PayloadTypes> {
523    /// Start building a new payload.
524    ///
525    /// Carries the caller's [`Span`] so the service can parent payload-building work under the
526    /// originating Engine API trace.
527    BuildNewPayload(
528        Box<BuildNewPayload<T::PayloadAttributes>>,
529        Span,
530        oneshot::Sender<Result<PayloadId, PayloadBuilderError>>,
531    ),
532    /// Get the best payload so far
533    BestPayload(PayloadId, oneshot::Sender<Option<Result<T::BuiltPayload, PayloadBuilderError>>>),
534    /// Get the payload timestamp for the given payload
535    PayloadTimestamp(PayloadId, oneshot::Sender<Option<Result<u64, PayloadBuilderError>>>),
536    /// Resolve the payload and return the payload
537    Resolve(
538        PayloadId,
539        /* kind: */ PayloadKind,
540        #[debug(skip)] oneshot::Sender<Option<PayloadFuture<T::BuiltPayload>>>,
541    ),
542    /// Payload service events
543    Subscribe(oneshot::Sender<broadcast::Receiver<Events<T>>>),
544}
545
546/// A request to build a new payload.
547#[derive(Debug)]
548pub struct BuildNewPayload<T> {
549    /// The attributes for the new payload
550    pub attributes: T,
551    /// The parent hash of the new payload
552    pub parent_hash: B256,
553    /// Optional execution cache to use for the payload.
554    ///
555    /// Only provided if `--engine.share-execution-cache-with-payload-builder` is enabled.
556    pub cache: Option<SavedCache>,
557    /// Optional handle to a background sparse trie task.
558    pub trie_handle: Option<StateRootHandle>,
559}
560
561impl<T: PayloadAttributes> BuildNewPayload<T> {
562    /// Returns the payload id for the new payload.
563    pub fn payload_id(&self) -> PayloadId {
564        self.attributes.payload_id(&self.parent_hash)
565    }
566}