Skip to main content

reth_basic_payload_builder/
lib.rs

1//! A basic payload generator for reth.
2
3#![doc(
4    html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
5    html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
6    issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
7)]
8#![cfg_attr(not(test), warn(unused_crate_dependencies))]
9#![cfg_attr(docsrs, feature(doc_cfg))]
10
11use crate::metrics::PayloadBuilderMetrics;
12use alloy_eips::merge::SLOT_DURATION;
13use alloy_primitives::{B256, U256};
14use futures_core::ready;
15use futures_util::FutureExt;
16use reth_chain_state::CanonStateNotification;
17use reth_execution_cache::SavedCache;
18use reth_payload_builder::{
19    BuildNewPayload, KeepPayloadJobAlive, PayloadId, PayloadJob, PayloadJobGenerator,
20};
21use reth_payload_builder_primitives::PayloadBuilderError;
22use reth_payload_primitives::{BuiltPayload, PayloadAttributes, PayloadKind};
23use reth_primitives_traits::{HeaderTy, NodePrimitives, SealedHeader};
24use reth_revm::{cached::CachedReads, cancelled::CancelOnDrop};
25use reth_storage_api::{BlockReaderIdExt, StateProviderFactory};
26use reth_tasks::Runtime;
27use reth_trie_parallel::state_root_task::StateRootHandle;
28use std::{
29    fmt,
30    future::Future,
31    ops::Deref,
32    pin::Pin,
33    sync::Arc,
34    task::{Context, Poll},
35    time::{Duration, SystemTime, UNIX_EPOCH},
36};
37use tokio::{
38    sync::{oneshot, Semaphore},
39    time::{Interval, Sleep},
40};
41use tracing::{debug, trace, warn};
42
43mod better_payload_emitter;
44mod metrics;
45mod stack;
46
47pub use better_payload_emitter::BetterPayloadEmitter;
48pub use stack::PayloadBuilderStack;
49
50const PAYLOAD_BUILDER_THREAD_NAME: &str = "payload-builder";
51
52/// Helper to access [`NodePrimitives::BlockHeader`] from [`PayloadBuilder::BuiltPayload`].
53pub type HeaderForPayload<P> = <<P as BuiltPayload>::Primitives as NodePrimitives>::BlockHeader;
54
55/// The [`PayloadJobGenerator`] that creates [`BasicPayloadJob`]s.
56#[derive(Debug)]
57pub struct BasicPayloadJobGenerator<Client, Builder> {
58    /// The client that can interact with the chain.
59    client: Client,
60    /// The task executor to spawn payload building tasks on.
61    executor: Runtime,
62    /// The configuration for the job generator.
63    config: BasicPayloadJobGeneratorConfig,
64    /// Restricts how many generator tasks can be executed at once.
65    payload_task_guard: PayloadTaskGuard,
66    /// The type responsible for building payloads.
67    ///
68    /// See [`PayloadBuilder`]
69    builder: Builder,
70    /// Stored `cached_reads` for new payload jobs.
71    pre_cached: Option<PrecachedState>,
72}
73
74// === impl BasicPayloadJobGenerator ===
75
76impl<Client, Builder> BasicPayloadJobGenerator<Client, Builder> {
77    /// Creates a new [`BasicPayloadJobGenerator`] with the given config and custom
78    /// [`PayloadBuilder`]
79    pub fn with_builder(
80        client: Client,
81        executor: Runtime,
82        config: BasicPayloadJobGeneratorConfig,
83        builder: Builder,
84    ) -> Self {
85        Self {
86            client,
87            executor,
88            payload_task_guard: PayloadTaskGuard::new(config.max_payload_tasks),
89            config,
90            builder,
91            pre_cached: None,
92        }
93    }
94
95    /// Returns the maximum duration a job should be allowed to run.
96    ///
97    /// This adheres to the following specification:
98    /// > Client software SHOULD stop the updating process when either a call to engine_getPayload
99    /// > with the build process's payloadId is made or SECONDS_PER_SLOT (12s in the Mainnet
100    /// > configuration) have passed since the point in time identified by the timestamp parameter.
101    ///
102    /// See also <https://github.com/ethereum/execution-apis/blob/431cf72fd3403d946ca3e3afc36b973fc87e0e89/src/engine/paris.md?plain=1#L137>
103    #[inline]
104    fn max_job_duration(&self, unix_timestamp: u64) -> Duration {
105        let duration_until_timestamp = duration_until(unix_timestamp);
106
107        // safety in case clocks are bad
108        let duration_until_timestamp = duration_until_timestamp.min(self.config.deadline * 3);
109
110        self.config.deadline + duration_until_timestamp
111    }
112
113    /// Returns the [Instant](tokio::time::Instant) at which the job should be terminated because it
114    /// is considered timed out.
115    #[inline]
116    fn job_deadline(&self, unix_timestamp: u64) -> tokio::time::Instant {
117        tokio::time::Instant::now() + self.max_job_duration(unix_timestamp)
118    }
119
120    /// Returns a reference to the tasks type
121    pub const fn tasks(&self) -> &Runtime {
122        &self.executor
123    }
124
125    /// Returns the pre-cached reads for the given parent header if it matches the cached state's
126    /// block.
127    fn maybe_pre_cached(&self, parent: B256) -> Option<CachedReads> {
128        self.pre_cached.as_ref().filter(|pc| pc.block == parent).map(|pc| pc.cached.clone())
129    }
130}
131
132// === impl BasicPayloadJobGenerator ===
133
134impl<Client, Builder> PayloadJobGenerator for BasicPayloadJobGenerator<Client, Builder>
135where
136    Client: StateProviderFactory
137        + BlockReaderIdExt<Header = HeaderForPayload<Builder::BuiltPayload>>
138        + Clone
139        + Unpin
140        + 'static,
141    Builder: PayloadBuilder + Unpin + 'static,
142    Builder::Attributes: Unpin + Clone,
143    Builder::BuiltPayload: Unpin + Clone,
144{
145    type Job = BasicPayloadJob<Builder>;
146
147    fn new_payload_job(
148        &self,
149        input: BuildNewPayload<Builder::Attributes>,
150        id: PayloadId,
151    ) -> Result<Self::Job, PayloadBuilderError> {
152        let parent_header = if input.parent_hash.is_zero() {
153            // Use latest header for genesis block case
154            self.client
155                .latest_header()
156                .map_err(PayloadBuilderError::from)?
157                .ok_or_else(|| PayloadBuilderError::MissingParentHeader(B256::ZERO))?
158        } else {
159            // Fetch specific header by hash
160            self.client
161                .sealed_header_by_hash(input.parent_hash)
162                .map_err(PayloadBuilderError::from)?
163                .ok_or_else(|| PayloadBuilderError::MissingParentHeader(input.parent_hash))?
164        };
165
166        let cached_reads = self.maybe_pre_cached(parent_header.hash());
167
168        let config = PayloadConfig::new(Arc::new(parent_header), input.attributes, id);
169
170        let until = self.job_deadline(config.attributes.timestamp());
171        let deadline = Box::pin(tokio::time::sleep_until(until));
172
173        let mut job = BasicPayloadJob {
174            config,
175            executor: self.executor.clone(),
176            deadline,
177            // ticks immediately
178            interval: tokio::time::interval(self.config.interval),
179            best_payload: PayloadState::Missing,
180            pending_block: None,
181            cached_reads,
182            execution_cache: input.cache,
183            trie_handle: input.trie_handle,
184            payload_task_guard: self.payload_task_guard.clone(),
185            metrics: Default::default(),
186            builder: self.builder.clone(),
187        };
188
189        // start the first job right away
190        job.spawn_build_job();
191
192        Ok(job)
193    }
194
195    fn on_new_state<N: NodePrimitives>(&mut self, new_state: CanonStateNotification<N>) {
196        let mut cached = CachedReads::default();
197
198        // extract the state from the notification and put it into the cache
199        let committed = new_state.committed();
200        let new_execution_outcome = committed.execution_outcome();
201        for (addr, acc) in new_execution_outcome.bundle_accounts_iter() {
202            if let Some(info) = acc.info.clone() {
203                // we want pre cache existing accounts and their storage
204                // this only includes changed accounts and storage but is better than nothing
205                let storage =
206                    acc.storage.iter().map(|(key, slot)| (*key, slot.present_value)).collect();
207                cached.insert_account(addr, info, storage);
208            }
209        }
210
211        self.pre_cached = Some(PrecachedState { block: committed.tip().hash(), cached });
212    }
213}
214
215/// Pre-filled [`CachedReads`] for a specific block.
216///
217/// This is extracted from the [`CanonStateNotification`] for the tip block.
218#[derive(Debug, Clone)]
219pub struct PrecachedState {
220    /// The block for which the state is pre-cached.
221    pub block: B256,
222    /// Cached state for the block.
223    pub cached: CachedReads,
224}
225
226/// Restricts how many generator tasks can be executed at once.
227#[derive(Debug, Clone)]
228pub struct PayloadTaskGuard(Arc<Semaphore>);
229
230impl Deref for PayloadTaskGuard {
231    type Target = Semaphore;
232
233    fn deref(&self) -> &Self::Target {
234        &self.0
235    }
236}
237
238// === impl PayloadTaskGuard ===
239
240impl PayloadTaskGuard {
241    /// Constructs `Self` with a maximum task count of `max_payload_tasks`.
242    pub fn new(max_payload_tasks: usize) -> Self {
243        Self(Arc::new(Semaphore::new(max_payload_tasks)))
244    }
245
246    /// Acquires an owned permit for a payload build task.
247    async fn acquire_owned(&self) -> tokio::sync::OwnedSemaphorePermit {
248        self.0.clone().acquire_owned().await.expect("payload task semaphore closed")
249    }
250}
251
252/// Settings for the [`BasicPayloadJobGenerator`].
253#[derive(Debug, Clone)]
254pub struct BasicPayloadJobGeneratorConfig {
255    /// The interval at which the job should build a new payload after the last.
256    interval: Duration,
257    /// The deadline for when the payload builder job should resolve.
258    ///
259    /// By default this is [`SLOT_DURATION`]: 12s
260    deadline: Duration,
261    /// Maximum number of tasks to spawn for building a payload.
262    max_payload_tasks: usize,
263}
264
265// === impl BasicPayloadJobGeneratorConfig ===
266
267impl BasicPayloadJobGeneratorConfig {
268    /// Sets the interval at which the job should build a new payload after the last.
269    pub const fn interval(mut self, interval: Duration) -> Self {
270        self.interval = interval;
271        self
272    }
273
274    /// Sets the deadline when this job should resolve.
275    pub const fn deadline(mut self, deadline: Duration) -> Self {
276        self.deadline = deadline;
277        self
278    }
279
280    /// Sets the maximum number of tasks to spawn for building a payload(s).
281    ///
282    /// # Panics
283    ///
284    /// If `max_payload_tasks` is 0.
285    pub fn max_payload_tasks(mut self, max_payload_tasks: usize) -> Self {
286        assert!(max_payload_tasks > 0, "max_payload_tasks must be greater than 0");
287        self.max_payload_tasks = max_payload_tasks;
288        self
289    }
290}
291
292impl Default for BasicPayloadJobGeneratorConfig {
293    fn default() -> Self {
294        Self {
295            interval: Duration::from_secs(1),
296            // 12s slot time
297            deadline: SLOT_DURATION,
298            max_payload_tasks: 3,
299        }
300    }
301}
302
303/// A basic payload job that continuously builds a payload with the best transactions from the pool.
304///
305/// This type is a [`PayloadJob`] and [`Future`] that terminates when the deadline is reached or
306/// when the job is resolved: [`PayloadJob::resolve`].
307///
308/// This basic job implementation will trigger new payload build task continuously until the job is
309/// resolved or the deadline is reached, or until the built payload is marked as frozen:
310/// [`BuildOutcome::Freeze`]. Once a frozen payload is returned, no additional payloads will be
311/// built and this future will wait to be resolved: [`PayloadJob::resolve`] or terminated if the
312/// deadline is reached.
313#[derive(Debug)]
314pub struct BasicPayloadJob<Builder>
315where
316    Builder: PayloadBuilder,
317{
318    /// The configuration for how the payload will be created.
319    config: PayloadConfig<Builder::Attributes, HeaderForPayload<Builder::BuiltPayload>>,
320    /// How to spawn building tasks
321    executor: Runtime,
322    /// The deadline when this job should resolve.
323    deadline: Pin<Box<Sleep>>,
324    /// The interval at which the job should build a new payload after the last.
325    interval: Interval,
326    /// The best payload so far and its state.
327    best_payload: PayloadState<Builder::BuiltPayload>,
328    /// Receiver for the block that is currently being built.
329    pending_block: Option<PendingPayload<Builder::BuiltPayload>>,
330    /// Restricts how many generator tasks can be executed at once.
331    payload_task_guard: PayloadTaskGuard,
332    /// Caches all disk reads for the state the new payloads builds on
333    ///
334    /// This is used to avoid reading the same state over and over again when new attempts are
335    /// triggered, because during the building process we'll repeatedly execute the transactions.
336    cached_reads: Option<CachedReads>,
337    /// Optional execution cache shared with the engine.
338    execution_cache: Option<SavedCache>,
339    /// Optional state root task handle, shared with the engine.
340    trie_handle: Option<StateRootHandle>,
341    /// metrics for this type
342    metrics: PayloadBuilderMetrics,
343    /// The type responsible for building payloads.
344    ///
345    /// See [`PayloadBuilder`]
346    builder: Builder,
347}
348
349impl<Builder> BasicPayloadJob<Builder>
350where
351    Builder: PayloadBuilder + Unpin + 'static,
352    Builder::Attributes: Unpin + Clone,
353    Builder::BuiltPayload: Unpin + Clone,
354{
355    /// Spawns a new payload build task.
356    fn spawn_build_job(&mut self) {
357        trace!(target: "payload_builder", id = %self.config.payload_id(), "spawn new payload build task");
358        let (tx, rx) = oneshot::channel();
359        let cancel = CancelOnDrop::default();
360        let _cancel = cancel.clone();
361        let guard = self.payload_task_guard.clone();
362        let payload_config = self.config.clone();
363        let best_payload = self.best_payload.payload().cloned();
364        self.metrics.inc_initiated_payload_builds();
365        let cached_reads = self.cached_reads.take().unwrap_or_default();
366        let execution_cache = self.execution_cache.clone();
367        let trie_handle = self.trie_handle.take();
368        let builder = self.builder.clone();
369        let executor = self.executor.clone();
370        self.executor.spawn_task(async move {
371            // acquire the permit for executing the task
372            let permit = guard.acquire_owned().await;
373            executor.spawn_blocking_named_or_tokio(PAYLOAD_BUILDER_THREAD_NAME, move || {
374                let _permit = permit;
375                let args = BuildArguments {
376                    cached_reads,
377                    execution_cache,
378                    trie_handle,
379                    config: payload_config,
380                    cancel,
381                    best_payload,
382                };
383                let result = builder.try_build(args);
384                let _ = tx.send(result);
385            });
386        });
387
388        self.pending_block = Some(PendingPayload { _cancel, payload: rx });
389    }
390}
391
392impl<Builder> Future for BasicPayloadJob<Builder>
393where
394    Builder: PayloadBuilder + Unpin + 'static,
395    Builder::Attributes: Unpin + Clone,
396    Builder::BuiltPayload: Unpin + Clone,
397{
398    type Output = Result<(), PayloadBuilderError>;
399
400    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
401        let this = self.get_mut();
402
403        // check if the deadline is reached
404        if this.deadline.as_mut().poll(cx).is_ready() {
405            trace!(target: "payload_builder", "payload building deadline reached");
406            return Poll::Ready(Ok(()))
407        }
408
409        loop {
410            // Wait for any pending build to complete before polling the next tick.
411            //
412            // This avoids consuming interval ticks while a build is still in-flight,
413            // which would delay the follow-up build by a full interval even though
414            // the current attempt has already finished.
415            if let Some(mut fut) = this.pending_block.take() {
416                match fut.poll_unpin(cx) {
417                    Poll::Ready(Ok(outcome)) => match outcome {
418                        BuildOutcome::Better { payload, cached_reads } => {
419                            this.cached_reads = Some(cached_reads);
420                            debug!(target: "payload_builder", value = %payload.fees(), "built better payload");
421                            this.best_payload = PayloadState::Best(payload);
422                        }
423                        BuildOutcome::Freeze(payload) => {
424                            debug!(target: "payload_builder", "payload frozen, no further building will occur");
425                            this.best_payload = PayloadState::Frozen(payload);
426                        }
427                        BuildOutcome::Aborted { fees, cached_reads } => {
428                            this.cached_reads = Some(cached_reads);
429                            trace!(target: "payload_builder", worse_fees = %fees, "skipped payload build of worse block");
430                        }
431                        BuildOutcome::Cancelled => {
432                            unreachable!("the cancel signal never fired")
433                        }
434                    },
435                    Poll::Ready(Err(error)) => {
436                        // job failed, but we simply try again next interval
437                        debug!(target: "payload_builder", %error, "payload build attempt failed");
438                        this.metrics.inc_failed_payload_builds();
439                    }
440                    Poll::Pending => {
441                        this.pending_block = Some(fut);
442                        return Poll::Pending
443                    }
444                }
445            }
446
447            if this.best_payload.is_frozen() {
448                return Poll::Pending
449            }
450
451            // Wait for the next build interval tick.
452            //
453            // The loop is needed because `poll_tick` does not register a waker
454            // when it returns `Ready`, so we must loop back after spawning a job
455            // to reach a point that *does* register one (the pending block poll above).
456            ready!(this.interval.poll_tick(cx));
457            this.spawn_build_job()
458        }
459    }
460}
461
462impl<Builder> PayloadJob for BasicPayloadJob<Builder>
463where
464    Builder: PayloadBuilder + Unpin + 'static,
465    Builder::Attributes: Unpin + Clone,
466    Builder::BuiltPayload: Unpin + Clone,
467{
468    type PayloadAttributes = Builder::Attributes;
469    type ResolvePayloadFuture = ResolveBestPayload<Self::BuiltPayload>;
470    type BuiltPayload = Builder::BuiltPayload;
471
472    fn best_payload(&self) -> Result<Self::BuiltPayload, PayloadBuilderError> {
473        if let Some(payload) = self.best_payload.payload() {
474            Ok(payload.clone())
475        } else {
476            // No payload has been built yet, but we need to return something that the CL then
477            // can deliver, so we need to return an empty payload.
478            //
479            // Note: it is assumed that this is unlikely to happen, as the payload job is
480            // started right away and the first full block should have been
481            // built by the time CL is requesting the payload.
482            self.metrics.inc_requested_empty_payload();
483            self.builder.build_empty_payload(self.config.clone())
484        }
485    }
486
487    fn payload_attributes(&self) -> Result<Self::PayloadAttributes, PayloadBuilderError> {
488        Ok(self.config.attributes.clone())
489    }
490
491    fn payload_timestamp(&self) -> Result<u64, PayloadBuilderError> {
492        Ok(self.config.attributes.timestamp())
493    }
494
495    fn resolve_kind(
496        &mut self,
497        kind: PayloadKind,
498    ) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive) {
499        let best_payload = self.best_payload.payload().cloned();
500        if best_payload.is_none() && self.pending_block.is_none() {
501            // ensure we have a job scheduled if we don't have a best payload yet and none is active
502            self.spawn_build_job();
503        }
504
505        let maybe_better = self.pending_block.take();
506        let mut empty_payload = None;
507
508        if best_payload.is_none() {
509            debug!(target: "payload_builder", id=%self.config.payload_id(), "no best payload yet to resolve, building empty payload");
510
511            let args = BuildArguments {
512                cached_reads: self.cached_reads.take().unwrap_or_default(),
513                execution_cache: self.execution_cache.clone(),
514                trie_handle: None,
515                config: self.config.clone(),
516                cancel: CancelOnDrop::default(),
517                best_payload: None,
518            };
519
520            match self.builder.on_missing_payload(args) {
521                MissingPayloadBehaviour::AwaitInProgress => {
522                    debug!(target: "payload_builder", id=%self.config.payload_id(), "awaiting in progress payload build job");
523                }
524                MissingPayloadBehaviour::RaceEmptyPayload => {
525                    debug!(target: "payload_builder", id=%self.config.payload_id(), "racing empty payload");
526
527                    // if no payload has been built yet
528                    self.metrics.inc_requested_empty_payload();
529                    // no payload built yet, so we need to return an empty payload
530                    let (tx, rx) = oneshot::channel();
531                    let config = self.config.clone();
532                    let builder = self.builder.clone();
533                    self.executor.spawn_blocking_named_or_tokio(
534                        PAYLOAD_BUILDER_THREAD_NAME,
535                        move || {
536                            let res = builder.build_empty_payload(config);
537                            let _ = tx.send(res);
538                        },
539                    );
540
541                    empty_payload = Some(rx);
542                }
543                MissingPayloadBehaviour::RacePayload(job) => {
544                    debug!(target: "payload_builder", id=%self.config.payload_id(), "racing fallback payload");
545                    // race the in progress job with this job
546                    let (tx, rx) = oneshot::channel();
547                    self.executor.spawn_blocking_named_or_tokio(
548                        PAYLOAD_BUILDER_THREAD_NAME,
549                        move || {
550                            let _ = tx.send(job());
551                        },
552                    );
553                    empty_payload = Some(rx);
554                }
555            };
556        }
557
558        let fut = ResolveBestPayload {
559            best_payload,
560            maybe_better,
561            empty_payload: empty_payload.filter(|_| kind != PayloadKind::WaitForPending),
562        };
563
564        (fut, KeepPayloadJobAlive::No)
565    }
566}
567
568/// Represents the current state of a payload being built.
569#[derive(Debug, Clone)]
570pub enum PayloadState<P> {
571    /// No payload has been built yet.
572    Missing,
573    /// The best payload built so far, which may still be improved upon.
574    Best(P),
575    /// The payload is frozen and no further building should occur.
576    ///
577    /// Contains the final payload `P` that should be used.
578    Frozen(P),
579}
580
581impl<P> PayloadState<P> {
582    /// Checks if the payload is frozen.
583    pub const fn is_frozen(&self) -> bool {
584        matches!(self, Self::Frozen(_))
585    }
586
587    /// Returns the payload if it exists (either Best or Frozen).
588    pub const fn payload(&self) -> Option<&P> {
589        match self {
590            Self::Missing => None,
591            Self::Best(p) | Self::Frozen(p) => Some(p),
592        }
593    }
594}
595
596/// The future that returns the best payload to be served to the consensus layer.
597///
598/// This returns the payload that's supposed to be sent to the CL.
599///
600/// If payload has been built so far, it will return that, but it will check if there's a better
601/// payload available from an in progress build job. If so it will return that.
602///
603/// If no payload has been built so far, it will either return an empty payload or the result of the
604/// in progress build job, whatever finishes first.
605#[derive(Debug)]
606pub struct ResolveBestPayload<Payload> {
607    /// Best payload so far.
608    pub best_payload: Option<Payload>,
609    /// Regular payload job that's currently running that might produce a better payload.
610    pub maybe_better: Option<PendingPayload<Payload>>,
611    /// The empty payload building job in progress, if any.
612    pub empty_payload: Option<oneshot::Receiver<Result<Payload, PayloadBuilderError>>>,
613}
614
615impl<Payload> ResolveBestPayload<Payload> {
616    const fn is_empty(&self) -> bool {
617        self.best_payload.is_none() && self.maybe_better.is_none() && self.empty_payload.is_none()
618    }
619}
620
621impl<Payload> Future for ResolveBestPayload<Payload>
622where
623    Payload: Unpin,
624{
625    type Output = Result<Payload, PayloadBuilderError>;
626
627    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
628        let this = self.get_mut();
629
630        // check if there is a better payload before returning the best payload
631        if let Some(fut) = Pin::new(&mut this.maybe_better).as_pin_mut() &&
632            let Poll::Ready(res) = fut.poll(cx)
633        {
634            this.maybe_better = None;
635            if let Ok(Some(payload)) = res.map(|out| out.into_payload()).inspect_err(
636                |err| warn!(target: "payload_builder", %err, "failed to resolve pending payload"),
637            ) {
638                debug!(target: "payload_builder", "resolving better payload");
639                return Poll::Ready(Ok(payload))
640            }
641        }
642
643        if let Some(best) = this.best_payload.take() {
644            debug!(target: "payload_builder", "resolving best payload");
645            return Poll::Ready(Ok(best))
646        }
647
648        if let Some(fut) = Pin::new(&mut this.empty_payload).as_pin_mut() &&
649            let Poll::Ready(res) = fut.poll(cx)
650        {
651            this.empty_payload = None;
652            return match res {
653                Ok(res) => {
654                    if let Err(err) = &res {
655                        warn!(target: "payload_builder", %err, "failed to resolve empty payload");
656                    } else {
657                        debug!(target: "payload_builder", "resolving empty payload");
658                    }
659                    Poll::Ready(res)
660                }
661                Err(err) => Poll::Ready(Err(err.into())),
662            }
663        }
664
665        if this.is_empty() {
666            return Poll::Ready(Err(PayloadBuilderError::MissingPayload))
667        }
668
669        Poll::Pending
670    }
671}
672
673/// A future that resolves to the result of the block building job.
674#[derive(Debug)]
675pub struct PendingPayload<P> {
676    /// The marker to cancel the job on drop
677    _cancel: CancelOnDrop,
678    /// The channel to send the result to.
679    payload: oneshot::Receiver<Result<BuildOutcome<P>, PayloadBuilderError>>,
680}
681
682impl<P> PendingPayload<P> {
683    /// Constructs a `PendingPayload` future.
684    pub const fn new(
685        cancel: CancelOnDrop,
686        payload: oneshot::Receiver<Result<BuildOutcome<P>, PayloadBuilderError>>,
687    ) -> Self {
688        Self { _cancel: cancel, payload }
689    }
690}
691
692impl<P> Future for PendingPayload<P> {
693    type Output = Result<BuildOutcome<P>, PayloadBuilderError>;
694
695    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
696        let res = ready!(self.payload.poll_unpin(cx));
697        Poll::Ready(res.map_err(Into::into).and_then(|res| res))
698    }
699}
700
701/// Static config for how to build a payload.
702#[derive(Clone, Debug)]
703pub struct PayloadConfig<Attributes, Header = alloy_consensus::Header> {
704    /// The parent header.
705    pub parent_header: Arc<SealedHeader<Header>>,
706    /// Requested attributes for the payload.
707    pub attributes: Attributes,
708    /// The payload id.
709    pub payload_id: PayloadId,
710}
711
712impl<Attributes, Header> PayloadConfig<Attributes, Header>
713where
714    Attributes: PayloadAttributes,
715{
716    /// Create new payload config.
717    pub const fn new(
718        parent_header: Arc<SealedHeader<Header>>,
719        attributes: Attributes,
720        payload_id: PayloadId,
721    ) -> Self {
722        Self { parent_header, attributes, payload_id }
723    }
724
725    /// Returns the payload id.
726    pub const fn payload_id(&self) -> PayloadId {
727        self.payload_id
728    }
729}
730
731/// The possible outcomes of a payload building attempt.
732#[derive(Debug)]
733pub enum BuildOutcome<Payload> {
734    /// Successfully built a better block.
735    Better {
736        /// The new payload that was built.
737        payload: Payload,
738        /// The cached reads that were used to build the payload.
739        cached_reads: CachedReads,
740    },
741    /// Aborted payload building because resulted in worse block wrt. fees.
742    Aborted {
743        /// The total fees associated with the attempted payload.
744        fees: U256,
745        /// The cached reads that were used to build the payload.
746        cached_reads: CachedReads,
747    },
748    /// Build job was cancelled
749    Cancelled,
750
751    /// The payload is final and no further building should occur
752    Freeze(Payload),
753}
754
755impl<Payload> BuildOutcome<Payload> {
756    /// Consumes the type and returns the payload if the outcome is `Better` or `Freeze`.
757    pub fn into_payload(self) -> Option<Payload> {
758        match self {
759            Self::Better { payload, .. } | Self::Freeze(payload) => Some(payload),
760            _ => None,
761        }
762    }
763
764    /// Consumes the type and returns the payload if the outcome is `Better` or `Freeze`.
765    pub const fn payload(&self) -> Option<&Payload> {
766        match self {
767            Self::Better { payload, .. } | Self::Freeze(payload) => Some(payload),
768            _ => None,
769        }
770    }
771
772    /// Returns true if the outcome is `Better`.
773    pub const fn is_better(&self) -> bool {
774        matches!(self, Self::Better { .. })
775    }
776
777    /// Returns true if the outcome is `Freeze`.
778    pub const fn is_frozen(&self) -> bool {
779        matches!(self, Self::Freeze { .. })
780    }
781
782    /// Returns true if the outcome is `Aborted`.
783    pub const fn is_aborted(&self) -> bool {
784        matches!(self, Self::Aborted { .. })
785    }
786
787    /// Returns true if the outcome is `Cancelled`.
788    pub const fn is_cancelled(&self) -> bool {
789        matches!(self, Self::Cancelled)
790    }
791
792    /// Applies a fn on the current payload.
793    pub fn map_payload<F, P>(self, f: F) -> BuildOutcome<P>
794    where
795        F: FnOnce(Payload) -> P,
796    {
797        match self {
798            Self::Better { payload, cached_reads } => {
799                BuildOutcome::Better { payload: f(payload), cached_reads }
800            }
801            Self::Aborted { fees, cached_reads } => BuildOutcome::Aborted { fees, cached_reads },
802            Self::Cancelled => BuildOutcome::Cancelled,
803            Self::Freeze(payload) => BuildOutcome::Freeze(f(payload)),
804        }
805    }
806}
807
808/// The possible outcomes of a payload building attempt without reused [`CachedReads`]
809#[derive(Debug)]
810pub enum BuildOutcomeKind<Payload> {
811    /// Successfully built a better block.
812    Better {
813        /// The new payload that was built.
814        payload: Payload,
815    },
816    /// Aborted payload building because resulted in worse block wrt. fees.
817    Aborted {
818        /// The total fees associated with the attempted payload.
819        fees: U256,
820    },
821    /// Build job was cancelled
822    Cancelled,
823    /// The payload is final and no further building should occur
824    Freeze(Payload),
825}
826
827impl<Payload> BuildOutcomeKind<Payload> {
828    /// Attaches the [`CachedReads`] to the outcome.
829    pub fn with_cached_reads(self, cached_reads: CachedReads) -> BuildOutcome<Payload> {
830        match self {
831            Self::Better { payload } => BuildOutcome::Better { payload, cached_reads },
832            Self::Aborted { fees } => BuildOutcome::Aborted { fees, cached_reads },
833            Self::Cancelled => BuildOutcome::Cancelled,
834            Self::Freeze(payload) => BuildOutcome::Freeze(payload),
835        }
836    }
837}
838
839/// A collection of arguments used for building payloads.
840///
841/// This struct encapsulates the essential components and configuration required for the payload
842/// building process. It holds references to the Ethereum client, transaction pool, cached reads,
843/// payload configuration, cancellation status, and the best payload achieved so far.
844#[derive(Debug)]
845pub struct BuildArguments<Attributes, Payload: BuiltPayload> {
846    /// Previously cached disk reads
847    pub cached_reads: CachedReads,
848    /// Optional execution cache shared with the engine.
849    pub execution_cache: Option<SavedCache>,
850    /// Optional state root task handle, shared with the engine.
851    ///
852    /// The preserved trie is shared with the engine, so a concurrent `newPayload` will
853    /// block until this task completes. The trie is anchored at the built block's state
854    /// root, so if the next `newPayload` is not on top of that block, the trie cache is
855    /// invalidated and cleared.
856    pub trie_handle: Option<StateRootHandle>,
857    /// How to configure the payload.
858    pub config: PayloadConfig<Attributes, HeaderTy<Payload::Primitives>>,
859    /// A marker that can be used to cancel the job.
860    pub cancel: CancelOnDrop,
861    /// The best payload achieved so far.
862    pub best_payload: Option<Payload>,
863}
864
865impl<Attributes, Payload: BuiltPayload> BuildArguments<Attributes, Payload> {
866    /// Create new build arguments.
867    pub const fn new(
868        cached_reads: CachedReads,
869        execution_cache: Option<SavedCache>,
870        trie_handle: Option<StateRootHandle>,
871        config: PayloadConfig<Attributes, HeaderTy<Payload::Primitives>>,
872        cancel: CancelOnDrop,
873        best_payload: Option<Payload>,
874    ) -> Self {
875        Self { cached_reads, execution_cache, trie_handle, config, cancel, best_payload }
876    }
877}
878
879/// A trait for building payloads that encapsulate Ethereum transactions.
880///
881/// This trait provides the `try_build` method to construct a transaction payload
882/// using `BuildArguments`. It returns a `Result` indicating success or a
883/// `PayloadBuilderError` if building fails.
884///
885/// Generic parameters `Pool` and `Client` represent the transaction pool and
886/// Ethereum client types.
887pub trait PayloadBuilder: Send + Sync + Clone {
888    /// The payload attributes type to accept for building.
889    type Attributes: PayloadAttributes;
890    /// The type of the built payload.
891    type BuiltPayload: BuiltPayload;
892
893    /// Tries to build a transaction payload using provided arguments.
894    ///
895    /// Constructs a transaction payload based on the given arguments,
896    /// returning a `Result` indicating success or an error if building fails.
897    ///
898    /// # Arguments
899    ///
900    /// - `args`: Build arguments containing necessary components.
901    ///
902    /// # Returns
903    ///
904    /// A `Result` indicating the build outcome or an error.
905    fn try_build(
906        &self,
907        args: BuildArguments<Self::Attributes, Self::BuiltPayload>,
908    ) -> Result<BuildOutcome<Self::BuiltPayload>, PayloadBuilderError>;
909
910    /// Invoked when the payload job is being resolved and there is no payload yet.
911    ///
912    /// This can happen if the CL requests a payload before the first payload has been built.
913    fn on_missing_payload(
914        &self,
915        _args: BuildArguments<Self::Attributes, Self::BuiltPayload>,
916    ) -> MissingPayloadBehaviour<Self::BuiltPayload> {
917        MissingPayloadBehaviour::RaceEmptyPayload
918    }
919
920    /// Builds an empty payload without any transaction.
921    fn build_empty_payload(
922        &self,
923        config: PayloadConfig<Self::Attributes, HeaderForPayload<Self::BuiltPayload>>,
924    ) -> Result<Self::BuiltPayload, PayloadBuilderError>;
925}
926
927/// Tells the payload builder how to react to payload request if there's no payload available yet.
928///
929/// This situation can occur if the CL requests a payload before the first payload has been built.
930#[derive(Default)]
931pub enum MissingPayloadBehaviour<Payload> {
932    /// Await the regular scheduled payload process.
933    AwaitInProgress,
934    /// Race the in progress payload process with an empty payload.
935    #[default]
936    RaceEmptyPayload,
937    /// Race the in progress payload process with this job.
938    RacePayload(Box<dyn FnOnce() -> Result<Payload, PayloadBuilderError> + Send>),
939}
940
941impl<Payload> fmt::Debug for MissingPayloadBehaviour<Payload> {
942    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
943        match self {
944            Self::AwaitInProgress => write!(f, "AwaitInProgress"),
945            Self::RaceEmptyPayload => {
946                write!(f, "RaceEmptyPayload")
947            }
948            Self::RacePayload(_) => write!(f, "RacePayload"),
949        }
950    }
951}
952
953/// Checks if the new payload is better than the current best.
954///
955/// This compares the total fees of the blocks, higher is better.
956#[inline(always)]
957pub fn is_better_payload<T: BuiltPayload>(best_payload: Option<&T>, new_fees: U256) -> bool {
958    if let Some(best_payload) = best_payload {
959        new_fees > best_payload.fees()
960    } else {
961        true
962    }
963}
964
965/// Returns the duration until the given unix timestamp in seconds.
966///
967/// Returns `Duration::ZERO` if the given timestamp is in the past.
968fn duration_until(unix_timestamp_secs: u64) -> Duration {
969    let unix_now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default();
970    let timestamp = Duration::from_secs(unix_timestamp_secs);
971    timestamp.saturating_sub(unix_now)
972}