Skip to main content

reth_payload_builder/
service.rs

1//! Support for building payloads.
2//!
3//! The payload builder is responsible for building payloads.
4//! Once a new payload is created, it is continuously updated.
5
6use crate::{
7    metrics::PayloadBuilderServiceMetrics, traits::PayloadJobGenerator, KeepPayloadJobAlive,
8    PayloadJob,
9};
10use alloy_consensus::BlockHeader;
11use alloy_primitives::{BlockTimestamp, B256};
12use alloy_rpc_types::engine::PayloadId;
13use futures_util::{future::FutureExt, Stream, StreamExt};
14use reth_chain_state::CanonStateNotification;
15use reth_payload_builder_primitives::{Events, PayloadBuilderError, PayloadEvents};
16use reth_payload_primitives::{BuiltPayload, PayloadAttributes, PayloadKind, PayloadTypes};
17use reth_primitives_traits::{FastInstant as Instant, NodePrimitives};
18use std::{
19    fmt,
20    future::Future,
21    pin::Pin,
22    sync::Arc,
23    task::{Context, Poll},
24};
25use tokio::sync::{
26    broadcast, mpsc,
27    oneshot::{self, Receiver},
28    watch,
29};
30use tokio_stream::wrappers::UnboundedReceiverStream;
31use tracing::{debug, debug_span, info, trace, warn, Span};
32
33type PayloadFuture<P> = Pin<Box<dyn Future<Output = Result<P, PayloadBuilderError>> + Send>>;
34
35/// A communication channel to the [`PayloadBuilderService`] that can retrieve payloads.
36///
37/// This type is intended to be used to retrieve payloads from the service (e.g. from the engine
38/// API).
39#[derive(Debug)]
40pub struct PayloadStore<T: PayloadTypes> {
41    inner: Arc<PayloadBuilderHandle<T>>,
42}
43
44impl<T> PayloadStore<T>
45where
46    T: PayloadTypes,
47{
48    /// Resolves the payload job and returns the best payload that has been built so far.
49    ///
50    /// Note: depending on the installed [`PayloadJobGenerator`], this may or may not terminate the
51    /// job, See [`PayloadJob::resolve`].
52    pub async fn resolve_kind(
53        &self,
54        id: PayloadId,
55        kind: PayloadKind,
56    ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
57        self.inner.resolve_kind(id, kind).await
58    }
59
60    /// Resolves the payload job and returns the best payload that has been built so far.
61    pub async fn resolve(
62        &self,
63        id: PayloadId,
64    ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
65        self.resolve_kind(id, PayloadKind::Earliest).await
66    }
67
68    /// Returns the best payload for the given identifier.
69    ///
70    /// Note: this merely returns the best payload so far and does not resolve the job.
71    pub async fn best_payload(
72        &self,
73        id: PayloadId,
74    ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
75        self.inner.best_payload(id).await
76    }
77
78    /// Returns the payload timestamp associated with the given identifier.
79    ///
80    /// Note: this returns the timestamp of the payload and does not resolve the job.
81    pub async fn payload_timestamp(
82        &self,
83        id: PayloadId,
84    ) -> Option<Result<u64, PayloadBuilderError>> {
85        self.inner.payload_timestamp(id).await
86    }
87
88    /// Create a new instance
89    pub fn new(inner: PayloadBuilderHandle<T>) -> Self {
90        Self { inner: Arc::new(inner) }
91    }
92}
93
94impl<T> From<PayloadBuilderHandle<T>> for PayloadStore<T>
95where
96    T: PayloadTypes,
97{
98    fn from(inner: PayloadBuilderHandle<T>) -> Self {
99        Self::new(inner)
100    }
101}
102
103/// A communication channel to the [`PayloadBuilderService`].
104///
105/// This is the API used to create new payloads and to get the current state of existing ones.
106#[derive(Debug)]
107pub struct PayloadBuilderHandle<T: PayloadTypes> {
108    /// Sender half of the message channel to the [`PayloadBuilderService`].
109    to_service: mpsc::UnboundedSender<PayloadServiceCommand<T>>,
110}
111
112impl<T: PayloadTypes> PayloadBuilderHandle<T> {
113    /// Creates a new payload builder handle for the given channel.
114    ///
115    /// Note: this is only used internally by the [`PayloadBuilderService`] to manage the payload
116    /// building flow See [`PayloadBuilderService::poll`] for implementation details.
117    pub const fn new(to_service: mpsc::UnboundedSender<PayloadServiceCommand<T>>) -> Self {
118        Self { to_service }
119    }
120
121    /// Sends a message to the service to start building a new payload for the given payload.
122    ///
123    /// Returns a receiver that will receive the payload id.
124    pub fn send_new_payload(
125        &self,
126        parent: B256,
127        attr: T::PayloadAttributes,
128    ) -> Receiver<Result<PayloadId, PayloadBuilderError>> {
129        let (tx, rx) = oneshot::channel();
130        let job_span = debug_span!(parent: Span::current(), "payload_job");
131        let _ = self
132            .to_service
133            .send(PayloadServiceCommand::BuildNewPayload(parent, attr, job_span, tx));
134        rx
135    }
136
137    /// Returns the best payload for the given identifier.
138    /// Note: this does not resolve the job if it's still in progress.
139    pub async fn best_payload(
140        &self,
141        id: PayloadId,
142    ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
143        let (tx, rx) = oneshot::channel();
144        self.to_service.send(PayloadServiceCommand::BestPayload(id, tx)).ok()?;
145        rx.await.ok()?
146    }
147
148    /// Resolves the payload job and returns the best payload that has been built so far.
149    pub async fn resolve_kind(
150        &self,
151        id: PayloadId,
152        kind: PayloadKind,
153    ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
154        let (tx, rx) = oneshot::channel();
155        self.to_service.send(PayloadServiceCommand::Resolve(id, kind, tx)).ok()?;
156        match rx.await.transpose()? {
157            Ok(fut) => Some(fut.await),
158            Err(e) => Some(Err(e.into())),
159        }
160    }
161
162    /// Sends a message to the service to subscribe to payload events.
163    /// Returns a receiver that will receive them.
164    pub async fn subscribe(&self) -> Result<PayloadEvents<T>, PayloadBuilderError> {
165        let (tx, rx) = oneshot::channel();
166        let _ = self.to_service.send(PayloadServiceCommand::Subscribe(tx));
167        Ok(PayloadEvents { receiver: rx.await? })
168    }
169
170    /// Returns the payload timestamp associated with the given identifier.
171    ///
172    /// Note: this returns the timestamp of the payload and does not resolve the job.
173    pub async fn payload_timestamp(
174        &self,
175        id: PayloadId,
176    ) -> Option<Result<u64, PayloadBuilderError>> {
177        let (tx, rx) = oneshot::channel();
178        self.to_service.send(PayloadServiceCommand::PayloadTimestamp(id, tx)).ok()?;
179        rx.await.ok()?
180    }
181}
182
183impl<T> Clone for PayloadBuilderHandle<T>
184where
185    T: PayloadTypes,
186{
187    fn clone(&self) -> Self {
188        Self { to_service: self.to_service.clone() }
189    }
190}
191
192/// A service that manages payload building tasks.
193///
194/// This type is an endless future that manages the building of payloads.
195///
196/// It tracks active payloads and their build jobs that run in a worker pool.
197///
198/// By design, this type relies entirely on the [`PayloadJobGenerator`] to create new payloads and
199/// does know nothing about how to build them, it just drives their jobs to completion.
200#[derive(Debug)]
201#[must_use = "futures do nothing unless you `.await` or poll them"]
202pub struct PayloadBuilderService<Gen, St, T>
203where
204    T: PayloadTypes,
205    Gen: PayloadJobGenerator,
206    Gen::Job: PayloadJob<PayloadAttributes = T::PayloadAttributes>,
207{
208    /// The type that knows how to create new payloads.
209    generator: Gen,
210    /// All active payload jobs, each accompanied by its id and the caller's tracing span
211    /// propagated across the channel so that poll and resolve work appears as children of the
212    /// original Engine API request.
213    payload_jobs: Vec<(Gen::Job, PayloadId, Span)>,
214    /// Copy of the sender half, so new [`PayloadBuilderHandle`] can be created on demand.
215    service_tx: mpsc::UnboundedSender<PayloadServiceCommand<T>>,
216    /// Receiver half of the command channel.
217    command_rx: UnboundedReceiverStream<PayloadServiceCommand<T>>,
218    /// Metrics for the payload builder service
219    metrics: PayloadBuilderServiceMetrics,
220    /// Chain events notification stream
221    chain_events: St,
222    /// Payload events handler, used to broadcast and subscribe to payload events.
223    payload_events: broadcast::Sender<Events<T>>,
224    /// We retain latest resolved payload just to make sure that we can handle repeating
225    /// requests for it gracefully.
226    cached_payload_rx: watch::Receiver<Option<(PayloadId, BlockTimestamp, T::BuiltPayload)>>,
227    /// Sender half of the cached payload channel.
228    cached_payload_tx: watch::Sender<Option<(PayloadId, BlockTimestamp, T::BuiltPayload)>>,
229}
230
231const PAYLOAD_EVENTS_BUFFER_SIZE: usize = 20;
232
233// === impl PayloadBuilderService ===
234
235impl<Gen, St, T> PayloadBuilderService<Gen, St, T>
236where
237    T: PayloadTypes,
238    Gen: PayloadJobGenerator,
239    Gen::Job: PayloadJob<PayloadAttributes = T::PayloadAttributes>,
240    <Gen::Job as PayloadJob>::BuiltPayload: Into<T::BuiltPayload>,
241{
242    /// Creates a new payload builder service and returns the [`PayloadBuilderHandle`] to interact
243    /// with it.
244    ///
245    /// This also takes a stream of chain events that will be forwarded to the generator to apply
246    /// additional logic when new state is committed. See also
247    /// [`PayloadJobGenerator::on_new_state`].
248    pub fn new(generator: Gen, chain_events: St) -> (Self, PayloadBuilderHandle<T>) {
249        let (service_tx, command_rx) = mpsc::unbounded_channel();
250        let (payload_events, _) = broadcast::channel(PAYLOAD_EVENTS_BUFFER_SIZE);
251
252        let (cached_payload_tx, cached_payload_rx) = watch::channel(None);
253
254        let service = Self {
255            generator,
256            payload_jobs: Vec::new(),
257            service_tx,
258            command_rx: UnboundedReceiverStream::new(command_rx),
259            metrics: Default::default(),
260            chain_events,
261            payload_events,
262            cached_payload_rx,
263            cached_payload_tx,
264        };
265
266        let handle = service.handle();
267        (service, handle)
268    }
269
270    /// Returns a handle to the service.
271    pub fn handle(&self) -> PayloadBuilderHandle<T> {
272        PayloadBuilderHandle::new(self.service_tx.clone())
273    }
274
275    /// Create clone on `payload_events` sending handle that could be used by builder to produce
276    /// additional events during block building
277    pub fn payload_events_handle(&self) -> broadcast::Sender<Events<T>> {
278        self.payload_events.clone()
279    }
280
281    /// Returns true if the given payload is currently being built.
282    fn contains_payload(&self, id: PayloadId) -> bool {
283        self.payload_jobs.iter().any(|(_, job_id, _)| *job_id == id)
284    }
285
286    /// Returns the best payload for the given identifier that has been built so far.
287    fn best_payload(&self, id: PayloadId) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
288        let res = self
289            .payload_jobs
290            .iter()
291            .find(|(_, job_id, _)| *job_id == id)
292            .map(|(j, _, _)| j.best_payload().map(|p| p.into()));
293        if let Some(Ok(ref best)) = res {
294            self.metrics.set_best_revenue(best.block().number(), f64::from(best.fees()));
295        }
296
297        res
298    }
299
300    /// Returns the best payload for the given identifier that has been built so far and terminates
301    /// the job if requested.
302    fn resolve(
303        &mut self,
304        id: PayloadId,
305        kind: PayloadKind,
306    ) -> Option<PayloadFuture<T::BuiltPayload>> {
307        let start = Instant::now();
308        debug!(target: "payload_builder", %id, "resolving payload job");
309
310        if let Some((cached, _, payload)) = &*self.cached_payload_rx.borrow() &&
311            *cached == id
312        {
313            self.metrics.resolve_duration_seconds.record(start.elapsed());
314            return Some(Box::pin(core::future::ready(Ok(payload.clone()))));
315        }
316
317        let job = self.payload_jobs.iter().position(|(_, job_id, _)| *job_id == id)?;
318        let (fut, keep_alive) = self.payload_jobs[job].0.resolve_kind(kind);
319        let payload_timestamp = self.payload_jobs[job].0.payload_timestamp();
320
321        if keep_alive == KeepPayloadJobAlive::No {
322            let (_, id, _) = self.payload_jobs.swap_remove(job);
323            debug!(target: "payload_builder", %id, "terminated resolved job");
324        }
325
326        // Since the fees will not be known until the payload future is resolved / awaited, we wrap
327        // the future in a new future that will update the metrics.
328        let resolved_metrics = self.metrics.clone();
329        let payload_events = self.payload_events.clone();
330        let cached_payload_tx = self.cached_payload_tx.clone();
331
332        let fut = async move {
333            let res = fut.await;
334            resolved_metrics.resolve_duration_seconds.record(start.elapsed());
335            if let Ok(payload) = &res {
336                if payload_events.receiver_count() > 0 {
337                    payload_events.send(Events::BuiltPayload(payload.clone().into())).ok();
338                }
339
340                if let Ok(timestamp) = payload_timestamp {
341                    let _ = cached_payload_tx.send(Some((id, timestamp, payload.clone().into())));
342                }
343
344                resolved_metrics
345                    .set_resolved_revenue(payload.block().number(), f64::from(payload.fees()));
346            }
347            res.map(|p| p.into())
348        };
349
350        Some(Box::pin(fut))
351    }
352
353    /// Returns the payload timestamp for the given payload.
354    fn payload_timestamp(&self, id: PayloadId) -> Option<Result<u64, PayloadBuilderError>> {
355        if let Some((cached_id, timestamp, _)) = *self.cached_payload_rx.borrow() &&
356            cached_id == id
357        {
358            return Some(Ok(timestamp));
359        }
360
361        let timestamp = self
362            .payload_jobs
363            .iter()
364            .find(|(_, job_id, _)| *job_id == id)
365            .map(|(j, _, _)| j.payload_timestamp());
366
367        if timestamp.is_none() {
368            trace!(target: "payload_builder", %id, "no matching payload job found to get timestamp for");
369        }
370
371        timestamp
372    }
373}
374
375impl<Gen, St, T, N> Future for PayloadBuilderService<Gen, St, T>
376where
377    T: PayloadTypes,
378    N: NodePrimitives,
379    Gen: PayloadJobGenerator + Unpin + 'static,
380    <Gen as PayloadJobGenerator>::Job: Unpin + 'static,
381    St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
382    Gen::Job: PayloadJob<PayloadAttributes = T::PayloadAttributes>,
383    <Gen::Job as PayloadJob>::BuiltPayload: Into<T::BuiltPayload>,
384{
385    type Output = ();
386
387    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
388        let this = self.get_mut();
389        loop {
390            // notify the generator of new chain events
391            while let Poll::Ready(Some(new_head)) = this.chain_events.poll_next_unpin(cx) {
392                this.generator.on_new_state(new_head);
393            }
394
395            // we poll all jobs first, so we always have the latest payload that we can report if
396            // requests
397            // we don't care about the order of the jobs, so we can just swap_remove them
398            for idx in (0..this.payload_jobs.len()).rev() {
399                let (mut job, id, job_span) = this.payload_jobs.swap_remove(idx);
400
401                let poll_result = {
402                    let _entered = job_span.enter();
403                    job.poll_unpin(cx)
404                };
405
406                match poll_result {
407                    Poll::Ready(Ok(_)) => {
408                        this.metrics.set_active_jobs(this.payload_jobs.len());
409                        trace!(target: "payload_builder", %id, "payload job finished");
410                    }
411                    Poll::Ready(Err(err)) => {
412                        warn!(target: "payload_builder",%err, ?id, "Payload builder job failed; resolving payload");
413                        this.metrics.inc_failed_jobs();
414                        this.metrics.set_active_jobs(this.payload_jobs.len());
415                    }
416                    Poll::Pending => {
417                        this.payload_jobs.push((job, id, job_span));
418                    }
419                }
420            }
421
422            // marker for exit condition
423            let mut new_job = false;
424
425            // drain all requests
426            while let Poll::Ready(Some(cmd)) = this.command_rx.poll_next_unpin(cx) {
427                match cmd {
428                    PayloadServiceCommand::BuildNewPayload(parent, attr, job_span, tx) => {
429                        let id = attr.payload_id(&parent);
430                        let mut res = Ok(id);
431
432                        if this.contains_payload(id) {
433                            debug!(target: "payload_builder", %id, %parent, "Payload job already in progress, ignoring.");
434                        } else {
435                            let start = Instant::now();
436                            let job_result = {
437                                let _entered = job_span.enter();
438                                this.generator.new_payload_job(parent, attr.clone(), id)
439                            };
440
441                            match job_result {
442                                Ok(job) => {
443                                    this.metrics.new_job_duration_seconds.record(start.elapsed());
444                                    info!(target: "payload_builder", %id, %parent, "New payload job created");
445                                    this.metrics.inc_initiated_jobs();
446                                    new_job = true;
447                                    this.payload_jobs.push((job, id, job_span));
448                                    this.payload_events.send(Events::Attributes(attr)).ok();
449
450                                    // Clear stale cached payload for this id so
451                                    // resolve() never returns an outdated result
452                                    // from a previous job with the same id.
453                                    if this
454                                        .cached_payload_rx
455                                        .borrow()
456                                        .as_ref()
457                                        .is_some_and(|(cached_id, _, _)| *cached_id == id)
458                                    {
459                                        trace!(target: "payload_builder", %id, "clearing stale cached payload for reused payload id");
460                                        let _ = this.cached_payload_tx.send(None);
461                                    }
462                                }
463                                Err(err) => {
464                                    this.metrics.new_job_duration_seconds.record(start.elapsed());
465                                    this.metrics.inc_failed_jobs();
466                                    warn!(target: "payload_builder", %err, %id, "Failed to create payload builder job");
467                                    res = Err(err);
468                                }
469                            }
470                        }
471
472                        let _ = tx.send(res);
473                    }
474                    PayloadServiceCommand::BestPayload(id, tx) => {
475                        let _ = tx.send(this.best_payload(id));
476                    }
477                    PayloadServiceCommand::PayloadTimestamp(id, tx) => {
478                        let timestamp = this.payload_timestamp(id);
479                        let _ = tx.send(timestamp);
480                    }
481                    PayloadServiceCommand::Resolve(id, strategy, tx) => {
482                        let _ = tx.send(this.resolve(id, strategy));
483                    }
484                    PayloadServiceCommand::Subscribe(tx) => {
485                        let new_rx = this.payload_events.subscribe();
486                        let _ = tx.send(new_rx);
487                    }
488                }
489            }
490
491            if !new_job {
492                return Poll::Pending
493            }
494        }
495    }
496}
497
498/// Message type for the [`PayloadBuilderService`].
499pub enum PayloadServiceCommand<T: PayloadTypes> {
500    /// Start building a new payload.
501    ///
502    /// Carries the caller's [`Span`] so the service can parent payload-building work under the
503    /// originating Engine API trace.
504    BuildNewPayload(
505        B256,
506        T::PayloadAttributes,
507        Span,
508        oneshot::Sender<Result<PayloadId, PayloadBuilderError>>,
509    ),
510    /// Get the best payload so far
511    BestPayload(PayloadId, oneshot::Sender<Option<Result<T::BuiltPayload, PayloadBuilderError>>>),
512    /// Get the payload timestamp for the given payload
513    PayloadTimestamp(PayloadId, oneshot::Sender<Option<Result<u64, PayloadBuilderError>>>),
514    /// Resolve the payload and return the payload
515    Resolve(
516        PayloadId,
517        /* kind: */ PayloadKind,
518        oneshot::Sender<Option<PayloadFuture<T::BuiltPayload>>>,
519    ),
520    /// Payload service events
521    Subscribe(oneshot::Sender<broadcast::Receiver<Events<T>>>),
522}
523
524impl<T> fmt::Debug for PayloadServiceCommand<T>
525where
526    T: PayloadTypes,
527{
528    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
529        match self {
530            Self::BuildNewPayload(f0, f1, _, f2) => {
531                f.debug_tuple("BuildNewPayload").field(&f0).field(&f1).field(&f2).finish()
532            }
533            Self::BestPayload(f0, f1) => {
534                f.debug_tuple("BestPayload").field(&f0).field(&f1).finish()
535            }
536            Self::PayloadTimestamp(f0, f1) => {
537                f.debug_tuple("PayloadTimestamp").field(&f0).field(&f1).finish()
538            }
539            Self::Resolve(f0, f1, _f2) => f.debug_tuple("Resolve").field(&f0).field(&f1).finish(),
540            Self::Subscribe(f0) => f.debug_tuple("Subscribe").field(&f0).finish(),
541        }
542    }
543}