Skip to main content

reth_basic_payload_builder/
lib.rs

1//! A basic payload generator for reth.
2
3#![doc(
4    html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
5    html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
6    issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
7)]
8#![cfg_attr(not(test), warn(unused_crate_dependencies))]
9#![cfg_attr(docsrs, feature(doc_cfg))]
10
11use crate::metrics::PayloadBuilderMetrics;
12use alloy_eips::merge::SLOT_DURATION;
13use alloy_primitives::{B256, U256};
14use futures_core::ready;
15use futures_util::FutureExt;
16use reth_chain_state::CanonStateNotification;
17use reth_execution_cache::SavedCache;
18use reth_payload_builder::{
19    BuildNewPayload, KeepPayloadJobAlive, PayloadId, PayloadJob, PayloadJobGenerator,
20};
21use reth_payload_builder_primitives::PayloadBuilderError;
22use reth_payload_primitives::{BuiltPayload, PayloadAttributes, PayloadKind};
23use reth_primitives_traits::{HeaderTy, NodePrimitives, SealedHeader};
24use reth_revm::{cached::CachedReads, cancelled::CancelOnDrop};
25use reth_storage_api::{BlockReaderIdExt, StateProviderFactory};
26use reth_tasks::Runtime;
27use reth_trie_parallel::state_root_task::StateRootHandle;
28use std::{
29    fmt,
30    future::Future,
31    ops::Deref,
32    pin::Pin,
33    sync::Arc,
34    task::{Context, Poll},
35    time::{Duration, SystemTime, UNIX_EPOCH},
36};
37use tokio::{
38    sync::{oneshot, Semaphore},
39    time::{Interval, Sleep},
40};
41use tracing::{debug, trace, warn};
42
43mod better_payload_emitter;
44mod metrics;
45mod stack;
46
47pub use better_payload_emitter::BetterPayloadEmitter;
48pub use stack::PayloadBuilderStack;
49
50const PAYLOAD_BUILDER_THREAD_NAME: &str = "payload-builder";
51
52/// Helper to access [`NodePrimitives::BlockHeader`] from [`PayloadBuilder::BuiltPayload`].
53pub type HeaderForPayload<P> = <<P as BuiltPayload>::Primitives as NodePrimitives>::BlockHeader;
54
55/// The [`PayloadJobGenerator`] that creates [`BasicPayloadJob`]s.
56#[derive(Debug)]
57pub struct BasicPayloadJobGenerator<Client, Builder> {
58    /// The client that can interact with the chain.
59    client: Client,
60    /// The task executor to spawn payload building tasks on.
61    executor: Runtime,
62    /// The configuration for the job generator.
63    config: BasicPayloadJobGeneratorConfig,
64    /// Restricts how many generator tasks can be executed at once.
65    payload_task_guard: PayloadTaskGuard,
66    /// The type responsible for building payloads.
67    ///
68    /// See [`PayloadBuilder`]
69    builder: Builder,
70    /// Stored `cached_reads` for new payload jobs.
71    pre_cached: Option<PrecachedState>,
72    /// Stored parent block information for new payload jobs.
73    pre_cached_parent_block_info: Option<PrecachedParentBlockInfo>,
74}
75
76// === impl BasicPayloadJobGenerator ===
77
78impl<Client, Builder> BasicPayloadJobGenerator<Client, Builder> {
79    /// Creates a new [`BasicPayloadJobGenerator`] with the given config and custom
80    /// [`PayloadBuilder`]
81    pub fn with_builder(
82        client: Client,
83        executor: Runtime,
84        config: BasicPayloadJobGeneratorConfig,
85        builder: Builder,
86    ) -> Self {
87        Self {
88            client,
89            executor,
90            payload_task_guard: PayloadTaskGuard::new(config.max_payload_tasks),
91            config,
92            builder,
93            pre_cached: None,
94            pre_cached_parent_block_info: None,
95        }
96    }
97
98    /// Returns the maximum duration a job should be allowed to run.
99    ///
100    /// This adheres to the following specification:
101    /// > Client software SHOULD stop the updating process when either a call to engine_getPayload
102    /// > with the build process's payloadId is made or SECONDS_PER_SLOT (12s in the Mainnet
103    /// > configuration) have passed since the point in time identified by the timestamp parameter.
104    ///
105    /// See also <https://github.com/ethereum/execution-apis/blob/431cf72fd3403d946ca3e3afc36b973fc87e0e89/src/engine/paris.md?plain=1#L137>
106    #[inline]
107    fn max_job_duration(&self, unix_timestamp: u64) -> Duration {
108        let duration_until_timestamp = duration_until(unix_timestamp);
109
110        // safety in case clocks are bad
111        let duration_until_timestamp = duration_until_timestamp.min(self.config.deadline * 3);
112
113        self.config.deadline + duration_until_timestamp
114    }
115
116    /// Returns the [Instant](tokio::time::Instant) at which the job should be terminated because it
117    /// is considered timed out.
118    #[inline]
119    fn job_deadline(&self, unix_timestamp: u64) -> tokio::time::Instant {
120        tokio::time::Instant::now() + self.max_job_duration(unix_timestamp)
121    }
122
123    /// Returns a reference to the tasks type
124    pub const fn tasks(&self) -> &Runtime {
125        &self.executor
126    }
127
128    /// Returns the pre-cached reads for the given parent header if it matches the cached state's
129    /// block.
130    fn maybe_pre_cached(&self, parent: B256) -> Option<CachedReads> {
131        if !self.config.pre_cache_state {
132            return None
133        }
134
135        self.pre_cached.as_ref().filter(|pc| pc.block == parent).map(|pc| pc.cached.clone())
136    }
137
138    /// Returns the cached parent block information if it matches the requested parent.
139    fn maybe_parent_block_info(&self, parent: B256) -> Option<PayloadParentBlockInfo> {
140        self.pre_cached_parent_block_info
141            .as_ref()
142            .filter(|info| info.block == parent)
143            .map(|info| info.parent_block_info)
144    }
145}
146
147// === impl BasicPayloadJobGenerator ===
148
149impl<Client, Builder> PayloadJobGenerator for BasicPayloadJobGenerator<Client, Builder>
150where
151    Client: StateProviderFactory
152        + BlockReaderIdExt<Header = HeaderForPayload<Builder::BuiltPayload>>
153        + Clone
154        + Unpin
155        + 'static,
156    Builder: PayloadBuilder + Unpin + 'static,
157    Builder::Attributes: Unpin + Clone,
158    Builder::BuiltPayload: Unpin + Clone,
159{
160    type Job = BasicPayloadJob<Builder>;
161
162    fn new_payload_job(
163        &self,
164        input: BuildNewPayload<Builder::Attributes>,
165        id: PayloadId,
166    ) -> Result<Self::Job, PayloadBuilderError> {
167        let parent_header = if input.parent_hash.is_zero() {
168            // Use latest header for genesis block case
169            self.client
170                .latest_header()
171                .map_err(PayloadBuilderError::from)?
172                .ok_or_else(|| PayloadBuilderError::MissingParentHeader(B256::ZERO))?
173        } else {
174            // Fetch specific header by hash
175            self.client
176                .sealed_header_by_hash(input.parent_hash)
177                .map_err(PayloadBuilderError::from)?
178                .ok_or_else(|| PayloadBuilderError::MissingParentHeader(input.parent_hash))?
179        };
180
181        let parent_hash = parent_header.hash();
182        let cached_reads = self.maybe_pre_cached(parent_hash);
183        let parent_block_info = self.maybe_parent_block_info(parent_hash);
184
185        let config = PayloadConfig::new(Arc::new(parent_header), input.attributes, id)
186            .with_parent_block_info(parent_block_info);
187
188        let until = self.job_deadline(config.attributes.timestamp());
189        let deadline = Box::pin(tokio::time::sleep_until(until));
190
191        let mut job = BasicPayloadJob {
192            config,
193            executor: self.executor.clone(),
194            deadline,
195            // ticks immediately
196            interval: tokio::time::interval(self.config.interval),
197            best_payload: PayloadState::Missing,
198            pending_block: None,
199            cached_reads,
200            execution_cache: input.cache,
201            trie_handle: input.trie_handle,
202            payload_task_guard: self.payload_task_guard.clone(),
203            metrics: Default::default(),
204            builder: self.builder.clone(),
205        };
206
207        // start the first job right away
208        job.spawn_build_job();
209
210        Ok(job)
211    }
212
213    fn on_new_state<N: NodePrimitives>(&mut self, new_state: CanonStateNotification<N>) {
214        if !self.config.pre_cache_state {
215            self.pre_cached = None;
216            return
217        }
218
219        let mut cached = CachedReads::default();
220
221        // extract the state from the notification and put it into the cache
222        let committed = new_state.committed();
223        let new_execution_outcome = committed.execution_outcome();
224        for (addr, acc) in new_execution_outcome.bundle_accounts_iter() {
225            if let Some(info) = acc.info.clone() {
226                // we want pre cache existing accounts and their storage
227                // this only includes changed accounts and storage but is better than nothing
228                let storage =
229                    acc.storage.iter().map(|(key, slot)| (*key, slot.present_value)).collect();
230                cached.insert_account(addr, info, storage);
231            }
232        }
233
234        let tip = committed.tip();
235        let block = tip.hash();
236        let parent_block_info =
237            PayloadParentBlockInfo { transaction_count: tip.transaction_count() };
238
239        self.pre_cached = Some(PrecachedState { block, cached });
240        self.pre_cached_parent_block_info =
241            Some(PrecachedParentBlockInfo { block, parent_block_info });
242    }
243}
244
245/// Pre-filled [`CachedReads`] for a specific block.
246///
247/// This is extracted from the [`CanonStateNotification`] for the tip block.
248#[derive(Debug, Clone)]
249pub struct PrecachedState {
250    /// The block for which the state is pre-cached.
251    pub block: B256,
252    /// Cached state for the block.
253    pub cached: CachedReads,
254}
255
256/// Pre-filled parent block information for a specific block.
257#[derive(Debug, Clone, Copy)]
258struct PrecachedParentBlockInfo {
259    /// The block for which the parent block information is cached.
260    block: B256,
261    /// Cached parent block information.
262    parent_block_info: PayloadParentBlockInfo,
263}
264
265/// Restricts how many generator tasks can be executed at once.
266#[derive(Debug, Clone)]
267pub struct PayloadTaskGuard(Arc<Semaphore>);
268
269impl Deref for PayloadTaskGuard {
270    type Target = Semaphore;
271
272    fn deref(&self) -> &Self::Target {
273        &self.0
274    }
275}
276
277// === impl PayloadTaskGuard ===
278
279impl PayloadTaskGuard {
280    /// Constructs `Self` with a maximum task count of `max_payload_tasks`.
281    pub fn new(max_payload_tasks: usize) -> Self {
282        Self(Arc::new(Semaphore::new(max_payload_tasks)))
283    }
284
285    /// Acquires an owned permit for a payload build task.
286    async fn acquire_owned(&self) -> tokio::sync::OwnedSemaphorePermit {
287        self.0.clone().acquire_owned().await.expect("payload task semaphore closed")
288    }
289}
290
291/// Settings for the [`BasicPayloadJobGenerator`].
292#[derive(Debug, Clone)]
293pub struct BasicPayloadJobGeneratorConfig {
294    /// The interval at which the job should build a new payload after the last.
295    interval: Duration,
296    /// The deadline for when the payload builder job should resolve.
297    ///
298    /// By default this is [`SLOT_DURATION`]: 12s
299    deadline: Duration,
300    /// Maximum number of tasks to spawn for building a payload.
301    max_payload_tasks: usize,
302    /// Whether to pre-cache changed state from canonical state notifications.
303    pre_cache_state: bool,
304}
305
306// === impl BasicPayloadJobGeneratorConfig ===
307
308impl BasicPayloadJobGeneratorConfig {
309    /// Sets the interval at which the job should build a new payload after the last.
310    pub const fn interval(mut self, interval: Duration) -> Self {
311        self.interval = interval;
312        self
313    }
314
315    /// Sets the deadline when this job should resolve.
316    pub const fn deadline(mut self, deadline: Duration) -> Self {
317        self.deadline = deadline;
318        self
319    }
320
321    /// Sets the maximum number of tasks to spawn for building a payload(s).
322    ///
323    /// # Panics
324    ///
325    /// If `max_payload_tasks` is 0.
326    pub fn max_payload_tasks(mut self, max_payload_tasks: usize) -> Self {
327        assert!(max_payload_tasks > 0, "max_payload_tasks must be greater than 0");
328        self.max_payload_tasks = max_payload_tasks;
329        self
330    }
331
332    /// Sets whether to pre-cache changed state from canonical state notifications.
333    ///
334    /// This keeps the parent block's state changes in memory so payload jobs building on top of it
335    /// can reuse those reads.
336    pub const fn pre_cache_state(mut self, pre_cache_state: bool) -> Self {
337        self.pre_cache_state = pre_cache_state;
338        self
339    }
340}
341
342impl Default for BasicPayloadJobGeneratorConfig {
343    fn default() -> Self {
344        Self {
345            interval: Duration::from_secs(1),
346            // 12s slot time
347            deadline: SLOT_DURATION,
348            max_payload_tasks: 3,
349            pre_cache_state: true,
350        }
351    }
352}
353
354/// A basic payload job that continuously builds a payload with the best transactions from the pool.
355///
356/// This type is a [`PayloadJob`] and [`Future`] that terminates when the deadline is reached or
357/// when the job is resolved: [`PayloadJob::resolve`].
358///
359/// This basic job implementation will trigger new payload build task continuously until the job is
360/// resolved or the deadline is reached, or until the built payload is marked as frozen:
361/// [`BuildOutcome::Freeze`]. Once a frozen payload is returned, no additional payloads will be
362/// built and this future will wait to be resolved: [`PayloadJob::resolve`] or terminated if the
363/// deadline is reached.
364#[derive(Debug)]
365pub struct BasicPayloadJob<Builder>
366where
367    Builder: PayloadBuilder,
368{
369    /// The configuration for how the payload will be created.
370    config: PayloadConfig<Builder::Attributes, HeaderForPayload<Builder::BuiltPayload>>,
371    /// How to spawn building tasks
372    executor: Runtime,
373    /// The deadline when this job should resolve.
374    deadline: Pin<Box<Sleep>>,
375    /// The interval at which the job should build a new payload after the last.
376    interval: Interval,
377    /// The best payload so far and its state.
378    best_payload: PayloadState<Builder::BuiltPayload>,
379    /// Receiver for the block that is currently being built.
380    pending_block: Option<PendingPayload<Builder::BuiltPayload>>,
381    /// Restricts how many generator tasks can be executed at once.
382    payload_task_guard: PayloadTaskGuard,
383    /// Caches all disk reads for the state the new payloads builds on
384    ///
385    /// This is used to avoid reading the same state over and over again when new attempts are
386    /// triggered, because during the building process we'll repeatedly execute the transactions.
387    cached_reads: Option<CachedReads>,
388    /// Optional execution cache shared with the engine.
389    execution_cache: Option<SavedCache>,
390    /// Optional state root task handle, shared with the engine.
391    trie_handle: Option<StateRootHandle>,
392    /// metrics for this type
393    metrics: PayloadBuilderMetrics,
394    /// The type responsible for building payloads.
395    ///
396    /// See [`PayloadBuilder`]
397    builder: Builder,
398}
399
400impl<Builder> BasicPayloadJob<Builder>
401where
402    Builder: PayloadBuilder + Unpin + 'static,
403    Builder::Attributes: Unpin + Clone,
404    Builder::BuiltPayload: Unpin + Clone,
405{
406    /// Spawns a new payload build task.
407    fn spawn_build_job(&mut self) {
408        trace!(target: "payload_builder", id = %self.config.payload_id(), "spawn new payload build task");
409        let (tx, rx) = oneshot::channel();
410        let cancel = CancelOnDrop::default();
411        let _cancel = cancel.clone();
412        let guard = self.payload_task_guard.clone();
413        let payload_config = self.config.clone();
414        let best_payload = self.best_payload.payload().cloned();
415        self.metrics.inc_initiated_payload_builds();
416        let cached_reads = self.cached_reads.take().unwrap_or_default();
417        let execution_cache = self.execution_cache.clone();
418        let trie_handle = self.trie_handle.take();
419        let builder = self.builder.clone();
420        let executor = self.executor.clone();
421        self.executor.spawn_task(async move {
422            // acquire the permit for executing the task
423            let permit = guard.acquire_owned().await;
424            executor.spawn_blocking_named_or_tokio(PAYLOAD_BUILDER_THREAD_NAME, move || {
425                let _permit = permit;
426                let args = BuildArguments {
427                    cached_reads,
428                    execution_cache,
429                    trie_handle,
430                    config: payload_config,
431                    cancel,
432                    best_payload,
433                };
434                let result = builder.try_build(args);
435                let _ = tx.send(result);
436            });
437        });
438
439        self.pending_block = Some(PendingPayload { _cancel, payload: rx });
440    }
441}
442
443impl<Builder> Future for BasicPayloadJob<Builder>
444where
445    Builder: PayloadBuilder + Unpin + 'static,
446    Builder::Attributes: Unpin + Clone,
447    Builder::BuiltPayload: Unpin + Clone,
448{
449    type Output = Result<(), PayloadBuilderError>;
450
451    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
452        let this = self.get_mut();
453
454        // check if the deadline is reached
455        if this.deadline.as_mut().poll(cx).is_ready() {
456            trace!(target: "payload_builder", "payload building deadline reached");
457            return Poll::Ready(Ok(()))
458        }
459
460        loop {
461            // Wait for any pending build to complete before polling the next tick.
462            //
463            // This avoids consuming interval ticks while a build is still in-flight,
464            // which would delay the follow-up build by a full interval even though
465            // the current attempt has already finished.
466            if let Some(mut fut) = this.pending_block.take() {
467                match fut.poll_unpin(cx) {
468                    Poll::Ready(Ok(outcome)) => match outcome {
469                        BuildOutcome::Better { payload, cached_reads } => {
470                            this.cached_reads = Some(cached_reads);
471                            debug!(target: "payload_builder", value = %payload.fees(), "built better payload");
472                            this.best_payload = PayloadState::Best(payload);
473                        }
474                        BuildOutcome::Freeze(payload) => {
475                            debug!(target: "payload_builder", "payload frozen, no further building will occur");
476                            this.best_payload = PayloadState::Frozen(payload);
477                        }
478                        BuildOutcome::Aborted { fees, cached_reads } => {
479                            this.cached_reads = Some(cached_reads);
480                            trace!(target: "payload_builder", worse_fees = %fees, "skipped payload build of worse block");
481                        }
482                        BuildOutcome::Cancelled => {
483                            unreachable!("the cancel signal never fired")
484                        }
485                    },
486                    Poll::Ready(Err(error)) => {
487                        // job failed, but we simply try again next interval
488                        debug!(target: "payload_builder", %error, "payload build attempt failed");
489                        this.metrics.inc_failed_payload_builds();
490                    }
491                    Poll::Pending => {
492                        this.pending_block = Some(fut);
493                        return Poll::Pending
494                    }
495                }
496            }
497
498            if this.best_payload.is_frozen() {
499                return Poll::Pending
500            }
501
502            // Wait for the next build interval tick.
503            //
504            // The loop is needed because `poll_tick` does not register a waker
505            // when it returns `Ready`, so we must loop back after spawning a job
506            // to reach a point that *does* register one (the pending block poll above).
507            ready!(this.interval.poll_tick(cx));
508            this.spawn_build_job()
509        }
510    }
511}
512
513impl<Builder> PayloadJob for BasicPayloadJob<Builder>
514where
515    Builder: PayloadBuilder + Unpin + 'static,
516    Builder::Attributes: Unpin + Clone,
517    Builder::BuiltPayload: Unpin + Clone,
518{
519    type PayloadAttributes = Builder::Attributes;
520    type ResolvePayloadFuture = ResolveBestPayload<Self::BuiltPayload>;
521    type BuiltPayload = Builder::BuiltPayload;
522
523    fn best_payload(&self) -> Result<Self::BuiltPayload, PayloadBuilderError> {
524        if let Some(payload) = self.best_payload.payload() {
525            Ok(payload.clone())
526        } else {
527            // No payload has been built yet, but we need to return something that the CL then
528            // can deliver, so we need to return an empty payload.
529            //
530            // Note: it is assumed that this is unlikely to happen, as the payload job is
531            // started right away and the first full block should have been
532            // built by the time CL is requesting the payload.
533            self.metrics.inc_requested_empty_payload();
534            self.builder.build_empty_payload(self.config.clone())
535        }
536    }
537
538    fn payload_attributes(&self) -> Result<Self::PayloadAttributes, PayloadBuilderError> {
539        Ok(self.config.attributes.clone())
540    }
541
542    fn payload_timestamp(&self) -> Result<u64, PayloadBuilderError> {
543        Ok(self.config.attributes.timestamp())
544    }
545
546    fn resolve_kind(
547        &mut self,
548        kind: PayloadKind,
549    ) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive) {
550        let best_payload = self.best_payload.payload().cloned();
551        if best_payload.is_none() && self.pending_block.is_none() {
552            // ensure we have a job scheduled if we don't have a best payload yet and none is active
553            self.spawn_build_job();
554        }
555
556        let maybe_better = self.pending_block.take();
557        let mut empty_payload = None;
558
559        if best_payload.is_none() {
560            debug!(target: "payload_builder", id=%self.config.payload_id(), "no best payload yet to resolve, building empty payload");
561
562            let args = BuildArguments {
563                cached_reads: self.cached_reads.take().unwrap_or_default(),
564                execution_cache: self.execution_cache.clone(),
565                trie_handle: None,
566                config: self.config.clone(),
567                cancel: CancelOnDrop::default(),
568                best_payload: None,
569            };
570
571            match self.builder.on_missing_payload(args) {
572                MissingPayloadBehaviour::AwaitInProgress => {
573                    debug!(target: "payload_builder", id=%self.config.payload_id(), "awaiting in progress payload build job");
574                }
575                MissingPayloadBehaviour::RaceEmptyPayload => {
576                    debug!(target: "payload_builder", id=%self.config.payload_id(), "racing empty payload");
577
578                    // if no payload has been built yet
579                    self.metrics.inc_requested_empty_payload();
580                    // no payload built yet, so we need to return an empty payload
581                    let (tx, rx) = oneshot::channel();
582                    let config = self.config.clone();
583                    let builder = self.builder.clone();
584                    self.executor.spawn_blocking_named_or_tokio(
585                        PAYLOAD_BUILDER_THREAD_NAME,
586                        move || {
587                            let res = builder.build_empty_payload(config);
588                            let _ = tx.send(res);
589                        },
590                    );
591
592                    empty_payload = Some(rx);
593                }
594                MissingPayloadBehaviour::RacePayload(job) => {
595                    debug!(target: "payload_builder", id=%self.config.payload_id(), "racing fallback payload");
596                    // race the in progress job with this job
597                    let (tx, rx) = oneshot::channel();
598                    self.executor.spawn_blocking_named_or_tokio(
599                        PAYLOAD_BUILDER_THREAD_NAME,
600                        move || {
601                            let _ = tx.send(job());
602                        },
603                    );
604                    empty_payload = Some(rx);
605                }
606            };
607        }
608
609        let fut = ResolveBestPayload {
610            best_payload,
611            maybe_better,
612            empty_payload: empty_payload.filter(|_| kind != PayloadKind::WaitForPending),
613        };
614
615        (fut, KeepPayloadJobAlive::No)
616    }
617}
618
619/// Represents the current state of a payload being built.
620#[derive(Debug, Clone)]
621pub enum PayloadState<P> {
622    /// No payload has been built yet.
623    Missing,
624    /// The best payload built so far, which may still be improved upon.
625    Best(P),
626    /// The payload is frozen and no further building should occur.
627    ///
628    /// Contains the final payload `P` that should be used.
629    Frozen(P),
630}
631
632impl<P> PayloadState<P> {
633    /// Checks if the payload is frozen.
634    pub const fn is_frozen(&self) -> bool {
635        matches!(self, Self::Frozen(_))
636    }
637
638    /// Returns the payload if it exists (either Best or Frozen).
639    pub const fn payload(&self) -> Option<&P> {
640        match self {
641            Self::Missing => None,
642            Self::Best(p) | Self::Frozen(p) => Some(p),
643        }
644    }
645}
646
647/// The future that returns the best payload to be served to the consensus layer.
648///
649/// This returns the payload that's supposed to be sent to the CL.
650///
651/// If payload has been built so far, it will return that, but it will check if there's a better
652/// payload available from an in progress build job. If so it will return that.
653///
654/// If no payload has been built so far, it will either return an empty payload or the result of the
655/// in progress build job, whatever finishes first.
656#[derive(Debug)]
657pub struct ResolveBestPayload<Payload> {
658    /// Best payload so far.
659    pub best_payload: Option<Payload>,
660    /// Regular payload job that's currently running that might produce a better payload.
661    pub maybe_better: Option<PendingPayload<Payload>>,
662    /// The empty payload building job in progress, if any.
663    pub empty_payload: Option<oneshot::Receiver<Result<Payload, PayloadBuilderError>>>,
664}
665
666impl<Payload> ResolveBestPayload<Payload> {
667    const fn is_empty(&self) -> bool {
668        self.best_payload.is_none() && self.maybe_better.is_none() && self.empty_payload.is_none()
669    }
670}
671
672impl<Payload> Future for ResolveBestPayload<Payload>
673where
674    Payload: Unpin,
675{
676    type Output = Result<Payload, PayloadBuilderError>;
677
678    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
679        let this = self.get_mut();
680
681        // check if there is a better payload before returning the best payload
682        if let Some(fut) = Pin::new(&mut this.maybe_better).as_pin_mut() &&
683            let Poll::Ready(res) = fut.poll(cx)
684        {
685            this.maybe_better = None;
686            if let Ok(Some(payload)) = res.map(|out| out.into_payload()).inspect_err(
687                |err| warn!(target: "payload_builder", %err, "failed to resolve pending payload"),
688            ) {
689                debug!(target: "payload_builder", "resolving better payload");
690                return Poll::Ready(Ok(payload))
691            }
692        }
693
694        if let Some(best) = this.best_payload.take() {
695            debug!(target: "payload_builder", "resolving best payload");
696            return Poll::Ready(Ok(best))
697        }
698
699        if let Some(fut) = Pin::new(&mut this.empty_payload).as_pin_mut() &&
700            let Poll::Ready(res) = fut.poll(cx)
701        {
702            this.empty_payload = None;
703            return match res {
704                Ok(res) => {
705                    if let Err(err) = &res {
706                        warn!(target: "payload_builder", %err, "failed to resolve empty payload");
707                    } else {
708                        debug!(target: "payload_builder", "resolving empty payload");
709                    }
710                    Poll::Ready(res)
711                }
712                Err(err) => Poll::Ready(Err(err.into())),
713            }
714        }
715
716        if this.is_empty() {
717            return Poll::Ready(Err(PayloadBuilderError::MissingPayload))
718        }
719
720        Poll::Pending
721    }
722}
723
724/// A future that resolves to the result of the block building job.
725#[derive(Debug)]
726pub struct PendingPayload<P> {
727    /// The marker to cancel the job on drop
728    _cancel: CancelOnDrop,
729    /// The channel to send the result to.
730    payload: oneshot::Receiver<Result<BuildOutcome<P>, PayloadBuilderError>>,
731}
732
733impl<P> PendingPayload<P> {
734    /// Constructs a `PendingPayload` future.
735    pub const fn new(
736        cancel: CancelOnDrop,
737        payload: oneshot::Receiver<Result<BuildOutcome<P>, PayloadBuilderError>>,
738    ) -> Self {
739        Self { _cancel: cancel, payload }
740    }
741}
742
743impl<P> Future for PendingPayload<P> {
744    type Output = Result<BuildOutcome<P>, PayloadBuilderError>;
745
746    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
747        let res = ready!(self.payload.poll_unpin(cx));
748        Poll::Ready(res.map_err(Into::into).and_then(|res| res))
749    }
750}
751
752/// Static config for how to build a payload.
753#[derive(Clone, Debug)]
754pub struct PayloadConfig<Attributes, Header = alloy_consensus::Header> {
755    /// The parent header.
756    pub parent_header: Arc<SealedHeader<Header>>,
757    /// Additional parent block information, if available.
758    pub parent_block_info: Option<PayloadParentBlockInfo>,
759    /// Requested attributes for the payload.
760    pub attributes: Attributes,
761    /// The payload id.
762    pub payload_id: PayloadId,
763}
764
765/// Additional information about the parent block.
766#[derive(Clone, Copy, Debug, PartialEq, Eq)]
767pub struct PayloadParentBlockInfo {
768    /// Number of transactions in the parent block.
769    pub transaction_count: usize,
770}
771
772impl<Attributes, Header> PayloadConfig<Attributes, Header>
773where
774    Attributes: PayloadAttributes,
775{
776    /// Create new payload config.
777    pub const fn new(
778        parent_header: Arc<SealedHeader<Header>>,
779        attributes: Attributes,
780        payload_id: PayloadId,
781    ) -> Self {
782        Self { parent_header, parent_block_info: None, attributes, payload_id }
783    }
784
785    /// Attaches cached parent block information.
786    pub const fn with_parent_block_info(
787        mut self,
788        parent_block_info: Option<PayloadParentBlockInfo>,
789    ) -> Self {
790        self.parent_block_info = parent_block_info;
791        self
792    }
793
794    /// Returns the payload id.
795    pub const fn payload_id(&self) -> PayloadId {
796        self.payload_id
797    }
798}
799
800/// The possible outcomes of a payload building attempt.
801#[derive(Debug)]
802pub enum BuildOutcome<Payload> {
803    /// Successfully built a better block.
804    Better {
805        /// The new payload that was built.
806        payload: Payload,
807        /// The cached reads that were used to build the payload.
808        cached_reads: CachedReads,
809    },
810    /// Aborted payload building because resulted in worse block wrt. fees.
811    Aborted {
812        /// The total fees associated with the attempted payload.
813        fees: U256,
814        /// The cached reads that were used to build the payload.
815        cached_reads: CachedReads,
816    },
817    /// Build job was cancelled
818    Cancelled,
819
820    /// The payload is final and no further building should occur
821    Freeze(Payload),
822}
823
824impl<Payload> BuildOutcome<Payload> {
825    /// Consumes the type and returns the payload if the outcome is `Better` or `Freeze`.
826    pub fn into_payload(self) -> Option<Payload> {
827        match self {
828            Self::Better { payload, .. } | Self::Freeze(payload) => Some(payload),
829            _ => None,
830        }
831    }
832
833    /// Consumes the type and returns the payload if the outcome is `Better` or `Freeze`.
834    pub const fn payload(&self) -> Option<&Payload> {
835        match self {
836            Self::Better { payload, .. } | Self::Freeze(payload) => Some(payload),
837            _ => None,
838        }
839    }
840
841    /// Returns true if the outcome is `Better`.
842    pub const fn is_better(&self) -> bool {
843        matches!(self, Self::Better { .. })
844    }
845
846    /// Returns true if the outcome is `Freeze`.
847    pub const fn is_frozen(&self) -> bool {
848        matches!(self, Self::Freeze { .. })
849    }
850
851    /// Returns true if the outcome is `Aborted`.
852    pub const fn is_aborted(&self) -> bool {
853        matches!(self, Self::Aborted { .. })
854    }
855
856    /// Returns true if the outcome is `Cancelled`.
857    pub const fn is_cancelled(&self) -> bool {
858        matches!(self, Self::Cancelled)
859    }
860
861    /// Applies a fn on the current payload.
862    pub fn map_payload<F, P>(self, f: F) -> BuildOutcome<P>
863    where
864        F: FnOnce(Payload) -> P,
865    {
866        match self {
867            Self::Better { payload, cached_reads } => {
868                BuildOutcome::Better { payload: f(payload), cached_reads }
869            }
870            Self::Aborted { fees, cached_reads } => BuildOutcome::Aborted { fees, cached_reads },
871            Self::Cancelled => BuildOutcome::Cancelled,
872            Self::Freeze(payload) => BuildOutcome::Freeze(f(payload)),
873        }
874    }
875}
876
877/// The possible outcomes of a payload building attempt without reused [`CachedReads`]
878#[derive(Debug)]
879pub enum BuildOutcomeKind<Payload> {
880    /// Successfully built a better block.
881    Better {
882        /// The new payload that was built.
883        payload: Payload,
884    },
885    /// Aborted payload building because resulted in worse block wrt. fees.
886    Aborted {
887        /// The total fees associated with the attempted payload.
888        fees: U256,
889    },
890    /// Build job was cancelled
891    Cancelled,
892    /// The payload is final and no further building should occur
893    Freeze(Payload),
894}
895
896impl<Payload> BuildOutcomeKind<Payload> {
897    /// Attaches the [`CachedReads`] to the outcome.
898    pub fn with_cached_reads(self, cached_reads: CachedReads) -> BuildOutcome<Payload> {
899        match self {
900            Self::Better { payload } => BuildOutcome::Better { payload, cached_reads },
901            Self::Aborted { fees } => BuildOutcome::Aborted { fees, cached_reads },
902            Self::Cancelled => BuildOutcome::Cancelled,
903            Self::Freeze(payload) => BuildOutcome::Freeze(payload),
904        }
905    }
906}
907
908/// A collection of arguments used for building payloads.
909///
910/// This struct encapsulates the essential components and configuration required for the payload
911/// building process. It holds references to the Ethereum client, transaction pool, cached reads,
912/// payload configuration, cancellation status, and the best payload achieved so far.
913#[derive(Debug)]
914pub struct BuildArguments<Attributes, Payload: BuiltPayload> {
915    /// Previously cached disk reads
916    pub cached_reads: CachedReads,
917    /// Optional execution cache shared with the engine.
918    pub execution_cache: Option<SavedCache>,
919    /// Optional state root task handle, shared with the engine.
920    ///
921    /// The preserved trie is shared with the engine, so a concurrent `newPayload` will
922    /// block until this task completes. The trie is anchored at the built block's state
923    /// root, so if the next `newPayload` is not on top of that block, the trie cache is
924    /// invalidated and cleared.
925    pub trie_handle: Option<StateRootHandle>,
926    /// How to configure the payload.
927    pub config: PayloadConfig<Attributes, HeaderTy<Payload::Primitives>>,
928    /// A marker that can be used to cancel the job.
929    pub cancel: CancelOnDrop,
930    /// The best payload achieved so far.
931    pub best_payload: Option<Payload>,
932}
933
934impl<Attributes, Payload: BuiltPayload> BuildArguments<Attributes, Payload> {
935    /// Create new build arguments.
936    pub const fn new(
937        cached_reads: CachedReads,
938        execution_cache: Option<SavedCache>,
939        trie_handle: Option<StateRootHandle>,
940        config: PayloadConfig<Attributes, HeaderTy<Payload::Primitives>>,
941        cancel: CancelOnDrop,
942        best_payload: Option<Payload>,
943    ) -> Self {
944        Self { cached_reads, execution_cache, trie_handle, config, cancel, best_payload }
945    }
946}
947
948/// A trait for building payloads that encapsulate Ethereum transactions.
949///
950/// This trait provides the `try_build` method to construct a transaction payload
951/// using `BuildArguments`. It returns a `Result` indicating success or a
952/// `PayloadBuilderError` if building fails.
953///
954/// Generic parameters `Pool` and `Client` represent the transaction pool and
955/// Ethereum client types.
956pub trait PayloadBuilder: Send + Sync + Clone {
957    /// The payload attributes type to accept for building.
958    type Attributes: PayloadAttributes;
959    /// The type of the built payload.
960    type BuiltPayload: BuiltPayload;
961
962    /// Tries to build a transaction payload using provided arguments.
963    ///
964    /// Constructs a transaction payload based on the given arguments,
965    /// returning a `Result` indicating success or an error if building fails.
966    ///
967    /// # Arguments
968    ///
969    /// - `args`: Build arguments containing necessary components.
970    ///
971    /// # Returns
972    ///
973    /// A `Result` indicating the build outcome or an error.
974    fn try_build(
975        &self,
976        args: BuildArguments<Self::Attributes, Self::BuiltPayload>,
977    ) -> Result<BuildOutcome<Self::BuiltPayload>, PayloadBuilderError>;
978
979    /// Invoked when the payload job is being resolved and there is no payload yet.
980    ///
981    /// This can happen if the CL requests a payload before the first payload has been built.
982    fn on_missing_payload(
983        &self,
984        _args: BuildArguments<Self::Attributes, Self::BuiltPayload>,
985    ) -> MissingPayloadBehaviour<Self::BuiltPayload> {
986        MissingPayloadBehaviour::RaceEmptyPayload
987    }
988
989    /// Builds an empty payload without any transaction.
990    fn build_empty_payload(
991        &self,
992        config: PayloadConfig<Self::Attributes, HeaderForPayload<Self::BuiltPayload>>,
993    ) -> Result<Self::BuiltPayload, PayloadBuilderError>;
994}
995
996/// Tells the payload builder how to react to payload request if there's no payload available yet.
997///
998/// This situation can occur if the CL requests a payload before the first payload has been built.
999#[derive(Default)]
1000pub enum MissingPayloadBehaviour<Payload> {
1001    /// Await the regular scheduled payload process.
1002    AwaitInProgress,
1003    /// Race the in progress payload process with an empty payload.
1004    #[default]
1005    RaceEmptyPayload,
1006    /// Race the in progress payload process with this job.
1007    RacePayload(Box<dyn FnOnce() -> Result<Payload, PayloadBuilderError> + Send>),
1008}
1009
1010impl<Payload> fmt::Debug for MissingPayloadBehaviour<Payload> {
1011    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1012        match self {
1013            Self::AwaitInProgress => write!(f, "AwaitInProgress"),
1014            Self::RaceEmptyPayload => {
1015                write!(f, "RaceEmptyPayload")
1016            }
1017            Self::RacePayload(_) => write!(f, "RacePayload"),
1018        }
1019    }
1020}
1021
1022/// Checks if the new payload is better than the current best.
1023///
1024/// This compares the total fees of the blocks, higher is better.
1025#[inline(always)]
1026pub fn is_better_payload<T: BuiltPayload>(best_payload: Option<&T>, new_fees: U256) -> bool {
1027    if let Some(best_payload) = best_payload {
1028        new_fees > best_payload.fees()
1029    } else {
1030        true
1031    }
1032}
1033
1034/// Returns the duration until the given unix timestamp in seconds.
1035///
1036/// Returns `Duration::ZERO` if the given timestamp is in the past.
1037fn duration_until(unix_timestamp_secs: u64) -> Duration {
1038    let unix_now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default();
1039    let timestamp = Duration::from_secs(unix_timestamp_secs);
1040    timestamp.saturating_sub(unix_now)
1041}