reth_payload_builder/
service.rs

1//! Support for building payloads.
2//!
3//! The payload builder is responsible for building payloads.
4//! Once a new payload is created, it is continuously updated.
5
6use crate::{
7    metrics::PayloadBuilderServiceMetrics, traits::PayloadJobGenerator, KeepPayloadJobAlive,
8    PayloadJob,
9};
10use alloy_consensus::BlockHeader;
11use alloy_rpc_types::engine::PayloadId;
12use futures_util::{future::FutureExt, Stream, StreamExt};
13use reth_chain_state::CanonStateNotification;
14use reth_payload_builder_primitives::{Events, PayloadBuilderError, PayloadEvents};
15use reth_payload_primitives::{BuiltPayload, PayloadBuilderAttributes, PayloadKind, PayloadTypes};
16use reth_primitives_traits::NodePrimitives;
17use std::{
18    fmt,
19    future::Future,
20    pin::Pin,
21    sync::Arc,
22    task::{Context, Poll},
23};
24use tokio::sync::{
25    broadcast, mpsc,
26    oneshot::{self, Receiver},
27};
28use tokio_stream::wrappers::UnboundedReceiverStream;
29use tracing::{debug, info, trace, warn};
30
31type PayloadFuture<P> = Pin<Box<dyn Future<Output = Result<P, PayloadBuilderError>> + Send + Sync>>;
32
33/// A communication channel to the [`PayloadBuilderService`] that can retrieve payloads.
34///
35/// This type is intended to be used to retrieve payloads from the service (e.g. from the engine
36/// API).
37#[derive(Debug)]
38pub struct PayloadStore<T: PayloadTypes> {
39    inner: Arc<PayloadBuilderHandle<T>>,
40}
41
42impl<T> PayloadStore<T>
43where
44    T: PayloadTypes,
45{
46    /// Resolves the payload job and returns the best payload that has been built so far.
47    ///
48    /// Note: depending on the installed [`PayloadJobGenerator`], this may or may not terminate the
49    /// job, See [`PayloadJob::resolve`].
50    pub async fn resolve_kind(
51        &self,
52        id: PayloadId,
53        kind: PayloadKind,
54    ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
55        self.inner.resolve_kind(id, kind).await
56    }
57
58    /// Resolves the payload job and returns the best payload that has been built so far.
59    pub async fn resolve(
60        &self,
61        id: PayloadId,
62    ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
63        self.resolve_kind(id, PayloadKind::Earliest).await
64    }
65
66    /// Returns the best payload for the given identifier.
67    ///
68    /// Note: this merely returns the best payload so far and does not resolve the job.
69    pub async fn best_payload(
70        &self,
71        id: PayloadId,
72    ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
73        self.inner.best_payload(id).await
74    }
75
76    /// Returns the payload timestamp associated with the given identifier.
77    ///
78    /// Note: this returns the timestamp of the payload and does not resolve the job.
79    pub async fn payload_timestamp(
80        &self,
81        id: PayloadId,
82    ) -> Option<Result<u64, PayloadBuilderError>> {
83        self.inner.payload_timestamp(id).await
84    }
85}
86
87impl<T> PayloadStore<T>
88where
89    T: PayloadTypes,
90{
91    /// Create a new instance
92    pub fn new(inner: PayloadBuilderHandle<T>) -> Self {
93        Self { inner: Arc::new(inner) }
94    }
95}
96
97impl<T> From<PayloadBuilderHandle<T>> for PayloadStore<T>
98where
99    T: PayloadTypes,
100{
101    fn from(inner: PayloadBuilderHandle<T>) -> Self {
102        Self::new(inner)
103    }
104}
105
106/// A communication channel to the [`PayloadBuilderService`].
107///
108/// This is the API used to create new payloads and to get the current state of existing ones.
109#[derive(Debug)]
110pub struct PayloadBuilderHandle<T: PayloadTypes> {
111    /// Sender half of the message channel to the [`PayloadBuilderService`].
112    to_service: mpsc::UnboundedSender<PayloadServiceCommand<T>>,
113}
114
115impl<T: PayloadTypes> PayloadBuilderHandle<T> {
116    /// Creates a new payload builder handle for the given channel.
117    ///
118    /// Note: this is only used internally by the [`PayloadBuilderService`] to manage the payload
119    /// building flow See [`PayloadBuilderService::poll`] for implementation details.
120    pub const fn new(to_service: mpsc::UnboundedSender<PayloadServiceCommand<T>>) -> Self {
121        Self { to_service }
122    }
123
124    /// Sends a message to the service to start building a new payload for the given payload.
125    ///
126    /// Returns a receiver that will receive the payload id.
127    pub fn send_new_payload(
128        &self,
129        attr: T::PayloadBuilderAttributes,
130    ) -> Receiver<Result<PayloadId, PayloadBuilderError>> {
131        let (tx, rx) = oneshot::channel();
132        let _ = self.to_service.send(PayloadServiceCommand::BuildNewPayload(attr, tx));
133        rx
134    }
135
136    /// Returns the best payload for the given identifier.
137    /// Note: this does not resolve the job if it's still in progress.
138    pub async fn best_payload(
139        &self,
140        id: PayloadId,
141    ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
142        let (tx, rx) = oneshot::channel();
143        self.to_service.send(PayloadServiceCommand::BestPayload(id, tx)).ok()?;
144        rx.await.ok()?
145    }
146
147    /// Resolves the payload job and returns the best payload that has been built so far.
148    pub async fn resolve_kind(
149        &self,
150        id: PayloadId,
151        kind: PayloadKind,
152    ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
153        let (tx, rx) = oneshot::channel();
154        self.to_service.send(PayloadServiceCommand::Resolve(id, kind, tx)).ok()?;
155        match rx.await.transpose()? {
156            Ok(fut) => Some(fut.await),
157            Err(e) => Some(Err(e.into())),
158        }
159    }
160
161    /// Sends a message to the service to subscribe to payload events.
162    /// Returns a receiver that will receive them.
163    pub async fn subscribe(&self) -> Result<PayloadEvents<T>, PayloadBuilderError> {
164        let (tx, rx) = oneshot::channel();
165        let _ = self.to_service.send(PayloadServiceCommand::Subscribe(tx));
166        Ok(PayloadEvents { receiver: rx.await? })
167    }
168
169    /// Returns the payload timestamp associated with the given identifier.
170    ///
171    /// Note: this returns the timestamp of the payload and does not resolve the job.
172    pub async fn payload_timestamp(
173        &self,
174        id: PayloadId,
175    ) -> Option<Result<u64, PayloadBuilderError>> {
176        let (tx, rx) = oneshot::channel();
177        self.to_service.send(PayloadServiceCommand::PayloadTimestamp(id, tx)).ok()?;
178        rx.await.ok()?
179    }
180}
181
182impl<T> Clone for PayloadBuilderHandle<T>
183where
184    T: PayloadTypes,
185{
186    fn clone(&self) -> Self {
187        Self { to_service: self.to_service.clone() }
188    }
189}
190
191/// A service that manages payload building tasks.
192///
193/// This type is an endless future that manages the building of payloads.
194///
195/// It tracks active payloads and their build jobs that run in a worker pool.
196///
197/// By design, this type relies entirely on the [`PayloadJobGenerator`] to create new payloads and
198/// does know nothing about how to build them, it just drives their jobs to completion.
199#[derive(Debug)]
200#[must_use = "futures do nothing unless you `.await` or poll them"]
201pub struct PayloadBuilderService<Gen, St, T>
202where
203    T: PayloadTypes,
204    Gen: PayloadJobGenerator,
205    Gen::Job: PayloadJob<PayloadAttributes = T::PayloadBuilderAttributes>,
206{
207    /// The type that knows how to create new payloads.
208    generator: Gen,
209    /// All active payload jobs.
210    payload_jobs: Vec<(Gen::Job, PayloadId)>,
211    /// Copy of the sender half, so new [`PayloadBuilderHandle`] can be created on demand.
212    service_tx: mpsc::UnboundedSender<PayloadServiceCommand<T>>,
213    /// Receiver half of the command channel.
214    command_rx: UnboundedReceiverStream<PayloadServiceCommand<T>>,
215    /// Metrics for the payload builder service
216    metrics: PayloadBuilderServiceMetrics,
217    /// Chain events notification stream
218    chain_events: St,
219    /// Payload events handler, used to broadcast and subscribe to payload events.
220    payload_events: broadcast::Sender<Events<T>>,
221}
222
223const PAYLOAD_EVENTS_BUFFER_SIZE: usize = 20;
224
225// === impl PayloadBuilderService ===
226
227impl<Gen, St, T> PayloadBuilderService<Gen, St, T>
228where
229    T: PayloadTypes,
230    Gen: PayloadJobGenerator,
231    Gen::Job: PayloadJob<PayloadAttributes = T::PayloadBuilderAttributes>,
232    <Gen::Job as PayloadJob>::BuiltPayload: Into<T::BuiltPayload>,
233{
234    /// Creates a new payload builder service and returns the [`PayloadBuilderHandle`] to interact
235    /// with it.
236    ///
237    /// This also takes a stream of chain events that will be forwarded to the generator to apply
238    /// additional logic when new state is committed. See also
239    /// [`PayloadJobGenerator::on_new_state`].
240    pub fn new(generator: Gen, chain_events: St) -> (Self, PayloadBuilderHandle<T>) {
241        let (service_tx, command_rx) = mpsc::unbounded_channel();
242        let (payload_events, _) = broadcast::channel(PAYLOAD_EVENTS_BUFFER_SIZE);
243
244        let service = Self {
245            generator,
246            payload_jobs: Vec::new(),
247            service_tx,
248            command_rx: UnboundedReceiverStream::new(command_rx),
249            metrics: Default::default(),
250            chain_events,
251            payload_events,
252        };
253
254        let handle = service.handle();
255        (service, handle)
256    }
257
258    /// Returns a handle to the service.
259    pub fn handle(&self) -> PayloadBuilderHandle<T> {
260        PayloadBuilderHandle::new(self.service_tx.clone())
261    }
262
263    /// Create clone on `payload_events` sending handle that could be used by builder to produce
264    /// additional events during block building
265    pub fn payload_events_handle(&self) -> broadcast::Sender<Events<T>> {
266        self.payload_events.clone()
267    }
268
269    /// Returns true if the given payload is currently being built.
270    fn contains_payload(&self, id: PayloadId) -> bool {
271        self.payload_jobs.iter().any(|(_, job_id)| *job_id == id)
272    }
273
274    /// Returns the best payload for the given identifier that has been built so far.
275    fn best_payload(&self, id: PayloadId) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
276        let res = self
277            .payload_jobs
278            .iter()
279            .find(|(_, job_id)| *job_id == id)
280            .map(|(j, _)| j.best_payload().map(|p| p.into()));
281        if let Some(Ok(ref best)) = res {
282            self.metrics.set_best_revenue(best.block().number(), f64::from(best.fees()));
283        }
284
285        res
286    }
287
288    /// Returns the best payload for the given identifier that has been built so far and terminates
289    /// the job if requested.
290    fn resolve(
291        &mut self,
292        id: PayloadId,
293        kind: PayloadKind,
294    ) -> Option<PayloadFuture<T::BuiltPayload>> {
295        debug!(target: "payload_builder", %id, "resolving payload job");
296
297        let job = self.payload_jobs.iter().position(|(_, job_id)| *job_id == id)?;
298        let (fut, keep_alive) = self.payload_jobs[job].0.resolve_kind(kind);
299
300        if keep_alive == KeepPayloadJobAlive::No {
301            let (_, id) = self.payload_jobs.swap_remove(job);
302            debug!(target: "payload_builder", %id, "terminated resolved job");
303        }
304
305        // Since the fees will not be known until the payload future is resolved / awaited, we wrap
306        // the future in a new future that will update the metrics.
307        let resolved_metrics = self.metrics.clone();
308        let payload_events = self.payload_events.clone();
309
310        let fut = async move {
311            let res = fut.await;
312            if let Ok(payload) = &res {
313                if payload_events.receiver_count() > 0 {
314                    payload_events.send(Events::BuiltPayload(payload.clone().into())).ok();
315                }
316
317                resolved_metrics
318                    .set_resolved_revenue(payload.block().number(), f64::from(payload.fees()));
319            }
320            res.map(|p| p.into())
321        };
322
323        Some(Box::pin(fut))
324    }
325}
326
327impl<Gen, St, T> PayloadBuilderService<Gen, St, T>
328where
329    T: PayloadTypes,
330    Gen: PayloadJobGenerator,
331    Gen::Job: PayloadJob<PayloadAttributes = T::PayloadBuilderAttributes>,
332    <Gen::Job as PayloadJob>::BuiltPayload: Into<T::BuiltPayload>,
333{
334    /// Returns the payload timestamp for the given payload.
335    fn payload_timestamp(&self, id: PayloadId) -> Option<Result<u64, PayloadBuilderError>> {
336        let timestamp = self
337            .payload_jobs
338            .iter()
339            .find(|(_, job_id)| *job_id == id)
340            .map(|(j, _)| j.payload_timestamp());
341
342        if timestamp.is_none() {
343            trace!(target: "payload_builder", %id, "no matching payload job found to get timestamp for");
344        }
345
346        timestamp
347    }
348}
349
350impl<Gen, St, T, N> Future for PayloadBuilderService<Gen, St, T>
351where
352    T: PayloadTypes,
353    N: NodePrimitives,
354    Gen: PayloadJobGenerator + Unpin + 'static,
355    <Gen as PayloadJobGenerator>::Job: Unpin + 'static,
356    St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
357    Gen::Job: PayloadJob<PayloadAttributes = T::PayloadBuilderAttributes>,
358    <Gen::Job as PayloadJob>::BuiltPayload: Into<T::BuiltPayload>,
359{
360    type Output = ();
361
362    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
363        let this = self.get_mut();
364        loop {
365            // notify the generator of new chain events
366            while let Poll::Ready(Some(new_head)) = this.chain_events.poll_next_unpin(cx) {
367                this.generator.on_new_state(new_head);
368            }
369
370            // we poll all jobs first, so we always have the latest payload that we can report if
371            // requests
372            // we don't care about the order of the jobs, so we can just swap_remove them
373            for idx in (0..this.payload_jobs.len()).rev() {
374                let (mut job, id) = this.payload_jobs.swap_remove(idx);
375
376                // drain better payloads from the job
377                match job.poll_unpin(cx) {
378                    Poll::Ready(Ok(_)) => {
379                        this.metrics.set_active_jobs(this.payload_jobs.len());
380                        trace!(target: "payload_builder", %id, "payload job finished");
381                    }
382                    Poll::Ready(Err(err)) => {
383                        warn!(target: "payload_builder",%err, ?id, "Payload builder job failed; resolving payload");
384                        this.metrics.inc_failed_jobs();
385                        this.metrics.set_active_jobs(this.payload_jobs.len());
386                    }
387                    Poll::Pending => {
388                        // still pending, put it back
389                        this.payload_jobs.push((job, id));
390                    }
391                }
392            }
393
394            // marker for exit condition
395            let mut new_job = false;
396
397            // drain all requests
398            while let Poll::Ready(Some(cmd)) = this.command_rx.poll_next_unpin(cx) {
399                match cmd {
400                    PayloadServiceCommand::BuildNewPayload(attr, tx) => {
401                        let id = attr.payload_id();
402                        let mut res = Ok(id);
403
404                        if this.contains_payload(id) {
405                            debug!(target: "payload_builder",%id, parent = %attr.parent(), "Payload job already in progress, ignoring.");
406                        } else {
407                            // no job for this payload yet, create one
408                            let parent = attr.parent();
409                            match this.generator.new_payload_job(attr.clone()) {
410                                Ok(job) => {
411                                    info!(target: "payload_builder", %id, %parent, "New payload job created");
412                                    this.metrics.inc_initiated_jobs();
413                                    new_job = true;
414                                    this.payload_jobs.push((job, id));
415                                    this.payload_events.send(Events::Attributes(attr.clone())).ok();
416                                }
417                                Err(err) => {
418                                    this.metrics.inc_failed_jobs();
419                                    warn!(target: "payload_builder", %err, %id, "Failed to create payload builder job");
420                                    res = Err(err);
421                                }
422                            }
423                        }
424
425                        // return the id of the payload
426                        let _ = tx.send(res);
427                    }
428                    PayloadServiceCommand::BestPayload(id, tx) => {
429                        let _ = tx.send(this.best_payload(id));
430                    }
431                    PayloadServiceCommand::PayloadTimestamp(id, tx) => {
432                        let timestamp = this.payload_timestamp(id);
433                        let _ = tx.send(timestamp);
434                    }
435                    PayloadServiceCommand::Resolve(id, strategy, tx) => {
436                        let _ = tx.send(this.resolve(id, strategy));
437                    }
438                    PayloadServiceCommand::Subscribe(tx) => {
439                        let new_rx = this.payload_events.subscribe();
440                        let _ = tx.send(new_rx);
441                    }
442                }
443            }
444
445            if !new_job {
446                return Poll::Pending
447            }
448        }
449    }
450}
451
452/// Message type for the [`PayloadBuilderService`].
453pub enum PayloadServiceCommand<T: PayloadTypes> {
454    /// Start building a new payload.
455    BuildNewPayload(
456        T::PayloadBuilderAttributes,
457        oneshot::Sender<Result<PayloadId, PayloadBuilderError>>,
458    ),
459    /// Get the best payload so far
460    BestPayload(PayloadId, oneshot::Sender<Option<Result<T::BuiltPayload, PayloadBuilderError>>>),
461    /// Get the payload timestamp for the given payload
462    PayloadTimestamp(PayloadId, oneshot::Sender<Option<Result<u64, PayloadBuilderError>>>),
463    /// Resolve the payload and return the payload
464    Resolve(
465        PayloadId,
466        /* kind: */ PayloadKind,
467        oneshot::Sender<Option<PayloadFuture<T::BuiltPayload>>>,
468    ),
469    /// Payload service events
470    Subscribe(oneshot::Sender<broadcast::Receiver<Events<T>>>),
471}
472
473impl<T> fmt::Debug for PayloadServiceCommand<T>
474where
475    T: PayloadTypes,
476{
477    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
478        match self {
479            Self::BuildNewPayload(f0, f1) => {
480                f.debug_tuple("BuildNewPayload").field(&f0).field(&f1).finish()
481            }
482            Self::BestPayload(f0, f1) => {
483                f.debug_tuple("BestPayload").field(&f0).field(&f1).finish()
484            }
485            Self::PayloadTimestamp(f0, f1) => {
486                f.debug_tuple("PayloadAttributes").field(&f0).field(&f1).finish()
487            }
488            Self::Resolve(f0, f1, _f2) => f.debug_tuple("Resolve").field(&f0).field(&f1).finish(),
489            Self::Subscribe(f0) => f.debug_tuple("Subscribe").field(&f0).finish(),
490        }
491    }
492}