Skip to main content

reth_basic_payload_builder/
lib.rs

1//! A basic payload generator for reth.
2
3#![doc(
4    html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
5    html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
6    issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
7)]
8#![cfg_attr(not(test), warn(unused_crate_dependencies))]
9#![cfg_attr(docsrs, feature(doc_cfg))]
10
11use crate::metrics::PayloadBuilderMetrics;
12use alloy_eips::merge::SLOT_DURATION;
13use alloy_primitives::{B256, U256};
14use futures_core::ready;
15use futures_util::FutureExt;
16use reth_chain_state::CanonStateNotification;
17use reth_execution_cache::SavedCache;
18use reth_payload_builder::{
19    BuildNewPayload, KeepPayloadJobAlive, PayloadId, PayloadJob, PayloadJobGenerator,
20};
21use reth_payload_builder_primitives::PayloadBuilderError;
22use reth_payload_primitives::{BuiltPayload, PayloadAttributes, PayloadKind};
23use reth_primitives_traits::{HeaderTy, NodePrimitives, SealedHeader};
24use reth_revm::{cached::CachedReads, cancelled::CancelOnDrop};
25use reth_storage_api::{BlockReaderIdExt, StateProviderFactory};
26use reth_tasks::Runtime;
27use reth_trie_parallel::state_root_task::StateRootHandle;
28use std::{
29    fmt,
30    future::Future,
31    ops::Deref,
32    pin::Pin,
33    sync::Arc,
34    task::{Context, Poll},
35    time::{Duration, SystemTime, UNIX_EPOCH},
36};
37use tokio::{
38    sync::{oneshot, Semaphore},
39    time::{Interval, Sleep},
40};
41use tracing::{debug, trace, warn};
42
43mod better_payload_emitter;
44mod metrics;
45mod stack;
46
47pub use better_payload_emitter::BetterPayloadEmitter;
48pub use stack::PayloadBuilderStack;
49
50/// Helper to access [`NodePrimitives::BlockHeader`] from [`PayloadBuilder::BuiltPayload`].
51pub type HeaderForPayload<P> = <<P as BuiltPayload>::Primitives as NodePrimitives>::BlockHeader;
52
53/// The [`PayloadJobGenerator`] that creates [`BasicPayloadJob`]s.
54#[derive(Debug)]
55pub struct BasicPayloadJobGenerator<Client, Builder> {
56    /// The client that can interact with the chain.
57    client: Client,
58    /// The task executor to spawn payload building tasks on.
59    executor: Runtime,
60    /// The configuration for the job generator.
61    config: BasicPayloadJobGeneratorConfig,
62    /// Restricts how many generator tasks can be executed at once.
63    payload_task_guard: PayloadTaskGuard,
64    /// The type responsible for building payloads.
65    ///
66    /// See [`PayloadBuilder`]
67    builder: Builder,
68    /// Stored `cached_reads` for new payload jobs.
69    pre_cached: Option<PrecachedState>,
70}
71
72// === impl BasicPayloadJobGenerator ===
73
74impl<Client, Builder> BasicPayloadJobGenerator<Client, Builder> {
75    /// Creates a new [`BasicPayloadJobGenerator`] with the given config and custom
76    /// [`PayloadBuilder`]
77    pub fn with_builder(
78        client: Client,
79        executor: Runtime,
80        config: BasicPayloadJobGeneratorConfig,
81        builder: Builder,
82    ) -> Self {
83        Self {
84            client,
85            executor,
86            payload_task_guard: PayloadTaskGuard::new(config.max_payload_tasks),
87            config,
88            builder,
89            pre_cached: None,
90        }
91    }
92
93    /// Returns the maximum duration a job should be allowed to run.
94    ///
95    /// This adheres to the following specification:
96    /// > Client software SHOULD stop the updating process when either a call to engine_getPayload
97    /// > with the build process's payloadId is made or SECONDS_PER_SLOT (12s in the Mainnet
98    /// > configuration) have passed since the point in time identified by the timestamp parameter.
99    ///
100    /// See also <https://github.com/ethereum/execution-apis/blob/431cf72fd3403d946ca3e3afc36b973fc87e0e89/src/engine/paris.md?plain=1#L137>
101    #[inline]
102    fn max_job_duration(&self, unix_timestamp: u64) -> Duration {
103        let duration_until_timestamp = duration_until(unix_timestamp);
104
105        // safety in case clocks are bad
106        let duration_until_timestamp = duration_until_timestamp.min(self.config.deadline * 3);
107
108        self.config.deadline + duration_until_timestamp
109    }
110
111    /// Returns the [Instant](tokio::time::Instant) at which the job should be terminated because it
112    /// is considered timed out.
113    #[inline]
114    fn job_deadline(&self, unix_timestamp: u64) -> tokio::time::Instant {
115        tokio::time::Instant::now() + self.max_job_duration(unix_timestamp)
116    }
117
118    /// Returns a reference to the tasks type
119    pub const fn tasks(&self) -> &Runtime {
120        &self.executor
121    }
122
123    /// Returns the pre-cached reads for the given parent header if it matches the cached state's
124    /// block.
125    fn maybe_pre_cached(&self, parent: B256) -> Option<CachedReads> {
126        self.pre_cached.as_ref().filter(|pc| pc.block == parent).map(|pc| pc.cached.clone())
127    }
128}
129
130// === impl BasicPayloadJobGenerator ===
131
132impl<Client, Builder> PayloadJobGenerator for BasicPayloadJobGenerator<Client, Builder>
133where
134    Client: StateProviderFactory
135        + BlockReaderIdExt<Header = HeaderForPayload<Builder::BuiltPayload>>
136        + Clone
137        + Unpin
138        + 'static,
139    Builder: PayloadBuilder + Unpin + 'static,
140    Builder::Attributes: Unpin + Clone,
141    Builder::BuiltPayload: Unpin + Clone,
142{
143    type Job = BasicPayloadJob<Builder>;
144
145    fn new_payload_job(
146        &self,
147        input: BuildNewPayload<Builder::Attributes>,
148        id: PayloadId,
149    ) -> Result<Self::Job, PayloadBuilderError> {
150        let parent_header = if input.parent_hash.is_zero() {
151            // Use latest header for genesis block case
152            self.client
153                .latest_header()
154                .map_err(PayloadBuilderError::from)?
155                .ok_or_else(|| PayloadBuilderError::MissingParentHeader(B256::ZERO))?
156        } else {
157            // Fetch specific header by hash
158            self.client
159                .sealed_header_by_hash(input.parent_hash)
160                .map_err(PayloadBuilderError::from)?
161                .ok_or_else(|| PayloadBuilderError::MissingParentHeader(input.parent_hash))?
162        };
163
164        let cached_reads = self.maybe_pre_cached(parent_header.hash());
165
166        let config = PayloadConfig::new(Arc::new(parent_header), input.attributes, id);
167
168        let until = self.job_deadline(config.attributes.timestamp());
169        let deadline = Box::pin(tokio::time::sleep_until(until));
170
171        let mut job = BasicPayloadJob {
172            config,
173            executor: self.executor.clone(),
174            deadline,
175            // ticks immediately
176            interval: tokio::time::interval(self.config.interval),
177            best_payload: PayloadState::Missing,
178            pending_block: None,
179            cached_reads,
180            execution_cache: input.cache,
181            trie_handle: input.trie_handle,
182            payload_task_guard: self.payload_task_guard.clone(),
183            metrics: Default::default(),
184            builder: self.builder.clone(),
185        };
186
187        // start the first job right away
188        job.spawn_build_job();
189
190        Ok(job)
191    }
192
193    fn on_new_state<N: NodePrimitives>(&mut self, new_state: CanonStateNotification<N>) {
194        let mut cached = CachedReads::default();
195
196        // extract the state from the notification and put it into the cache
197        let committed = new_state.committed();
198        let new_execution_outcome = committed.execution_outcome();
199        for (addr, acc) in new_execution_outcome.bundle_accounts_iter() {
200            if let Some(info) = acc.info.clone() {
201                // we want pre cache existing accounts and their storage
202                // this only includes changed accounts and storage but is better than nothing
203                let storage =
204                    acc.storage.iter().map(|(key, slot)| (*key, slot.present_value)).collect();
205                cached.insert_account(addr, info, storage);
206            }
207        }
208
209        self.pre_cached = Some(PrecachedState { block: committed.tip().hash(), cached });
210    }
211}
212
213/// Pre-filled [`CachedReads`] for a specific block.
214///
215/// This is extracted from the [`CanonStateNotification`] for the tip block.
216#[derive(Debug, Clone)]
217pub struct PrecachedState {
218    /// The block for which the state is pre-cached.
219    pub block: B256,
220    /// Cached state for the block.
221    pub cached: CachedReads,
222}
223
224/// Restricts how many generator tasks can be executed at once.
225#[derive(Debug, Clone)]
226pub struct PayloadTaskGuard(Arc<Semaphore>);
227
228impl Deref for PayloadTaskGuard {
229    type Target = Semaphore;
230
231    fn deref(&self) -> &Self::Target {
232        &self.0
233    }
234}
235
236// === impl PayloadTaskGuard ===
237
238impl PayloadTaskGuard {
239    /// Constructs `Self` with a maximum task count of `max_payload_tasks`.
240    pub fn new(max_payload_tasks: usize) -> Self {
241        Self(Arc::new(Semaphore::new(max_payload_tasks)))
242    }
243}
244
245/// Settings for the [`BasicPayloadJobGenerator`].
246#[derive(Debug, Clone)]
247pub struct BasicPayloadJobGeneratorConfig {
248    /// The interval at which the job should build a new payload after the last.
249    interval: Duration,
250    /// The deadline for when the payload builder job should resolve.
251    ///
252    /// By default this is [`SLOT_DURATION`]: 12s
253    deadline: Duration,
254    /// Maximum number of tasks to spawn for building a payload.
255    max_payload_tasks: usize,
256}
257
258// === impl BasicPayloadJobGeneratorConfig ===
259
260impl BasicPayloadJobGeneratorConfig {
261    /// Sets the interval at which the job should build a new payload after the last.
262    pub const fn interval(mut self, interval: Duration) -> Self {
263        self.interval = interval;
264        self
265    }
266
267    /// Sets the deadline when this job should resolve.
268    pub const fn deadline(mut self, deadline: Duration) -> Self {
269        self.deadline = deadline;
270        self
271    }
272
273    /// Sets the maximum number of tasks to spawn for building a payload(s).
274    ///
275    /// # Panics
276    ///
277    /// If `max_payload_tasks` is 0.
278    pub fn max_payload_tasks(mut self, max_payload_tasks: usize) -> Self {
279        assert!(max_payload_tasks > 0, "max_payload_tasks must be greater than 0");
280        self.max_payload_tasks = max_payload_tasks;
281        self
282    }
283}
284
285impl Default for BasicPayloadJobGeneratorConfig {
286    fn default() -> Self {
287        Self {
288            interval: Duration::from_secs(1),
289            // 12s slot time
290            deadline: SLOT_DURATION,
291            max_payload_tasks: 3,
292        }
293    }
294}
295
296/// A basic payload job that continuously builds a payload with the best transactions from the pool.
297///
298/// This type is a [`PayloadJob`] and [`Future`] that terminates when the deadline is reached or
299/// when the job is resolved: [`PayloadJob::resolve`].
300///
301/// This basic job implementation will trigger new payload build task continuously until the job is
302/// resolved or the deadline is reached, or until the built payload is marked as frozen:
303/// [`BuildOutcome::Freeze`]. Once a frozen payload is returned, no additional payloads will be
304/// built and this future will wait to be resolved: [`PayloadJob::resolve`] or terminated if the
305/// deadline is reached.
306#[derive(Debug)]
307pub struct BasicPayloadJob<Builder>
308where
309    Builder: PayloadBuilder,
310{
311    /// The configuration for how the payload will be created.
312    config: PayloadConfig<Builder::Attributes, HeaderForPayload<Builder::BuiltPayload>>,
313    /// How to spawn building tasks
314    executor: Runtime,
315    /// The deadline when this job should resolve.
316    deadline: Pin<Box<Sleep>>,
317    /// The interval at which the job should build a new payload after the last.
318    interval: Interval,
319    /// The best payload so far and its state.
320    best_payload: PayloadState<Builder::BuiltPayload>,
321    /// Receiver for the block that is currently being built.
322    pending_block: Option<PendingPayload<Builder::BuiltPayload>>,
323    /// Restricts how many generator tasks can be executed at once.
324    payload_task_guard: PayloadTaskGuard,
325    /// Caches all disk reads for the state the new payloads builds on
326    ///
327    /// This is used to avoid reading the same state over and over again when new attempts are
328    /// triggered, because during the building process we'll repeatedly execute the transactions.
329    cached_reads: Option<CachedReads>,
330    /// Optional execution cache shared with the engine.
331    execution_cache: Option<SavedCache>,
332    /// Optional state root task handle, shared with the engine.
333    trie_handle: Option<StateRootHandle>,
334    /// metrics for this type
335    metrics: PayloadBuilderMetrics,
336    /// The type responsible for building payloads.
337    ///
338    /// See [`PayloadBuilder`]
339    builder: Builder,
340}
341
342impl<Builder> BasicPayloadJob<Builder>
343where
344    Builder: PayloadBuilder + Unpin + 'static,
345    Builder::Attributes: Unpin + Clone,
346    Builder::BuiltPayload: Unpin + Clone,
347{
348    /// Spawns a new payload build task.
349    fn spawn_build_job(&mut self) {
350        trace!(target: "payload_builder", id = %self.config.payload_id(), "spawn new payload build task");
351        let (tx, rx) = oneshot::channel();
352        let cancel = CancelOnDrop::default();
353        let _cancel = cancel.clone();
354        let guard = self.payload_task_guard.clone();
355        let payload_config = self.config.clone();
356        let best_payload = self.best_payload.payload().cloned();
357        self.metrics.inc_initiated_payload_builds();
358        let cached_reads = self.cached_reads.take().unwrap_or_default();
359        let execution_cache = self.execution_cache.clone();
360        let trie_handle = self.trie_handle.take();
361        let builder = self.builder.clone();
362        self.executor.spawn_blocking_task(async move {
363            // acquire the permit for executing the task
364            let _permit = guard.acquire().await;
365            let args = BuildArguments {
366                cached_reads,
367                execution_cache,
368                trie_handle,
369                config: payload_config,
370                cancel,
371                best_payload,
372            };
373            let result = builder.try_build(args);
374            let _ = tx.send(result);
375        });
376
377        self.pending_block = Some(PendingPayload { _cancel, payload: rx });
378    }
379}
380
381impl<Builder> Future for BasicPayloadJob<Builder>
382where
383    Builder: PayloadBuilder + Unpin + 'static,
384    Builder::Attributes: Unpin + Clone,
385    Builder::BuiltPayload: Unpin + Clone,
386{
387    type Output = Result<(), PayloadBuilderError>;
388
389    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
390        let this = self.get_mut();
391
392        // check if the deadline is reached
393        if this.deadline.as_mut().poll(cx).is_ready() {
394            trace!(target: "payload_builder", "payload building deadline reached");
395            return Poll::Ready(Ok(()))
396        }
397
398        loop {
399            // Wait for any pending build to complete before polling the next tick.
400            //
401            // This avoids consuming interval ticks while a build is still in-flight,
402            // which would delay the follow-up build by a full interval even though
403            // the current attempt has already finished.
404            if let Some(mut fut) = this.pending_block.take() {
405                match fut.poll_unpin(cx) {
406                    Poll::Ready(Ok(outcome)) => match outcome {
407                        BuildOutcome::Better { payload, cached_reads } => {
408                            this.cached_reads = Some(cached_reads);
409                            debug!(target: "payload_builder", value = %payload.fees(), "built better payload");
410                            this.best_payload = PayloadState::Best(payload);
411                        }
412                        BuildOutcome::Freeze(payload) => {
413                            debug!(target: "payload_builder", "payload frozen, no further building will occur");
414                            this.best_payload = PayloadState::Frozen(payload);
415                        }
416                        BuildOutcome::Aborted { fees, cached_reads } => {
417                            this.cached_reads = Some(cached_reads);
418                            trace!(target: "payload_builder", worse_fees = %fees, "skipped payload build of worse block");
419                        }
420                        BuildOutcome::Cancelled => {
421                            unreachable!("the cancel signal never fired")
422                        }
423                    },
424                    Poll::Ready(Err(error)) => {
425                        // job failed, but we simply try again next interval
426                        debug!(target: "payload_builder", %error, "payload build attempt failed");
427                        this.metrics.inc_failed_payload_builds();
428                    }
429                    Poll::Pending => {
430                        this.pending_block = Some(fut);
431                        return Poll::Pending
432                    }
433                }
434            }
435
436            if this.best_payload.is_frozen() {
437                return Poll::Pending
438            }
439
440            // Wait for the next build interval tick.
441            //
442            // The loop is needed because `poll_tick` does not register a waker
443            // when it returns `Ready`, so we must loop back after spawning a job
444            // to reach a point that *does* register one (the pending block poll above).
445            ready!(this.interval.poll_tick(cx));
446            this.spawn_build_job()
447        }
448    }
449}
450
451impl<Builder> PayloadJob for BasicPayloadJob<Builder>
452where
453    Builder: PayloadBuilder + Unpin + 'static,
454    Builder::Attributes: Unpin + Clone,
455    Builder::BuiltPayload: Unpin + Clone,
456{
457    type PayloadAttributes = Builder::Attributes;
458    type ResolvePayloadFuture = ResolveBestPayload<Self::BuiltPayload>;
459    type BuiltPayload = Builder::BuiltPayload;
460
461    fn best_payload(&self) -> Result<Self::BuiltPayload, PayloadBuilderError> {
462        if let Some(payload) = self.best_payload.payload() {
463            Ok(payload.clone())
464        } else {
465            // No payload has been built yet, but we need to return something that the CL then
466            // can deliver, so we need to return an empty payload.
467            //
468            // Note: it is assumed that this is unlikely to happen, as the payload job is
469            // started right away and the first full block should have been
470            // built by the time CL is requesting the payload.
471            self.metrics.inc_requested_empty_payload();
472            self.builder.build_empty_payload(self.config.clone())
473        }
474    }
475
476    fn payload_attributes(&self) -> Result<Self::PayloadAttributes, PayloadBuilderError> {
477        Ok(self.config.attributes.clone())
478    }
479
480    fn payload_timestamp(&self) -> Result<u64, PayloadBuilderError> {
481        Ok(self.config.attributes.timestamp())
482    }
483
484    fn resolve_kind(
485        &mut self,
486        kind: PayloadKind,
487    ) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive) {
488        let best_payload = self.best_payload.payload().cloned();
489        if best_payload.is_none() && self.pending_block.is_none() {
490            // ensure we have a job scheduled if we don't have a best payload yet and none is active
491            self.spawn_build_job();
492        }
493
494        let maybe_better = self.pending_block.take();
495        let mut empty_payload = None;
496
497        if best_payload.is_none() {
498            debug!(target: "payload_builder", id=%self.config.payload_id(), "no best payload yet to resolve, building empty payload");
499
500            let args = BuildArguments {
501                cached_reads: self.cached_reads.take().unwrap_or_default(),
502                execution_cache: self.execution_cache.clone(),
503                trie_handle: None,
504                config: self.config.clone(),
505                cancel: CancelOnDrop::default(),
506                best_payload: None,
507            };
508
509            match self.builder.on_missing_payload(args) {
510                MissingPayloadBehaviour::AwaitInProgress => {
511                    debug!(target: "payload_builder", id=%self.config.payload_id(), "awaiting in progress payload build job");
512                }
513                MissingPayloadBehaviour::RaceEmptyPayload => {
514                    debug!(target: "payload_builder", id=%self.config.payload_id(), "racing empty payload");
515
516                    // if no payload has been built yet
517                    self.metrics.inc_requested_empty_payload();
518                    // no payload built yet, so we need to return an empty payload
519                    let (tx, rx) = oneshot::channel();
520                    let config = self.config.clone();
521                    let builder = self.builder.clone();
522                    self.executor.spawn_blocking_task(async move {
523                        let res = builder.build_empty_payload(config);
524                        let _ = tx.send(res);
525                    });
526
527                    empty_payload = Some(rx);
528                }
529                MissingPayloadBehaviour::RacePayload(job) => {
530                    debug!(target: "payload_builder", id=%self.config.payload_id(), "racing fallback payload");
531                    // race the in progress job with this job
532                    let (tx, rx) = oneshot::channel();
533                    self.executor.spawn_blocking_task(async move {
534                        let _ = tx.send(job());
535                    });
536                    empty_payload = Some(rx);
537                }
538            };
539        }
540
541        let fut = ResolveBestPayload {
542            best_payload,
543            maybe_better,
544            empty_payload: empty_payload.filter(|_| kind != PayloadKind::WaitForPending),
545        };
546
547        (fut, KeepPayloadJobAlive::No)
548    }
549}
550
551/// Represents the current state of a payload being built.
552#[derive(Debug, Clone)]
553pub enum PayloadState<P> {
554    /// No payload has been built yet.
555    Missing,
556    /// The best payload built so far, which may still be improved upon.
557    Best(P),
558    /// The payload is frozen and no further building should occur.
559    ///
560    /// Contains the final payload `P` that should be used.
561    Frozen(P),
562}
563
564impl<P> PayloadState<P> {
565    /// Checks if the payload is frozen.
566    pub const fn is_frozen(&self) -> bool {
567        matches!(self, Self::Frozen(_))
568    }
569
570    /// Returns the payload if it exists (either Best or Frozen).
571    pub const fn payload(&self) -> Option<&P> {
572        match self {
573            Self::Missing => None,
574            Self::Best(p) | Self::Frozen(p) => Some(p),
575        }
576    }
577}
578
579/// The future that returns the best payload to be served to the consensus layer.
580///
581/// This returns the payload that's supposed to be sent to the CL.
582///
583/// If payload has been built so far, it will return that, but it will check if there's a better
584/// payload available from an in progress build job. If so it will return that.
585///
586/// If no payload has been built so far, it will either return an empty payload or the result of the
587/// in progress build job, whatever finishes first.
588#[derive(Debug)]
589pub struct ResolveBestPayload<Payload> {
590    /// Best payload so far.
591    pub best_payload: Option<Payload>,
592    /// Regular payload job that's currently running that might produce a better payload.
593    pub maybe_better: Option<PendingPayload<Payload>>,
594    /// The empty payload building job in progress, if any.
595    pub empty_payload: Option<oneshot::Receiver<Result<Payload, PayloadBuilderError>>>,
596}
597
598impl<Payload> ResolveBestPayload<Payload> {
599    const fn is_empty(&self) -> bool {
600        self.best_payload.is_none() && self.maybe_better.is_none() && self.empty_payload.is_none()
601    }
602}
603
604impl<Payload> Future for ResolveBestPayload<Payload>
605where
606    Payload: Unpin,
607{
608    type Output = Result<Payload, PayloadBuilderError>;
609
610    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
611        let this = self.get_mut();
612
613        // check if there is a better payload before returning the best payload
614        if let Some(fut) = Pin::new(&mut this.maybe_better).as_pin_mut() &&
615            let Poll::Ready(res) = fut.poll(cx)
616        {
617            this.maybe_better = None;
618            if let Ok(Some(payload)) = res.map(|out| out.into_payload()).inspect_err(
619                |err| warn!(target: "payload_builder", %err, "failed to resolve pending payload"),
620            ) {
621                debug!(target: "payload_builder", "resolving better payload");
622                return Poll::Ready(Ok(payload))
623            }
624        }
625
626        if let Some(best) = this.best_payload.take() {
627            debug!(target: "payload_builder", "resolving best payload");
628            return Poll::Ready(Ok(best))
629        }
630
631        if let Some(fut) = Pin::new(&mut this.empty_payload).as_pin_mut() &&
632            let Poll::Ready(res) = fut.poll(cx)
633        {
634            this.empty_payload = None;
635            return match res {
636                Ok(res) => {
637                    if let Err(err) = &res {
638                        warn!(target: "payload_builder", %err, "failed to resolve empty payload");
639                    } else {
640                        debug!(target: "payload_builder", "resolving empty payload");
641                    }
642                    Poll::Ready(res)
643                }
644                Err(err) => Poll::Ready(Err(err.into())),
645            }
646        }
647
648        if this.is_empty() {
649            return Poll::Ready(Err(PayloadBuilderError::MissingPayload))
650        }
651
652        Poll::Pending
653    }
654}
655
656/// A future that resolves to the result of the block building job.
657#[derive(Debug)]
658pub struct PendingPayload<P> {
659    /// The marker to cancel the job on drop
660    _cancel: CancelOnDrop,
661    /// The channel to send the result to.
662    payload: oneshot::Receiver<Result<BuildOutcome<P>, PayloadBuilderError>>,
663}
664
665impl<P> PendingPayload<P> {
666    /// Constructs a `PendingPayload` future.
667    pub const fn new(
668        cancel: CancelOnDrop,
669        payload: oneshot::Receiver<Result<BuildOutcome<P>, PayloadBuilderError>>,
670    ) -> Self {
671        Self { _cancel: cancel, payload }
672    }
673}
674
675impl<P> Future for PendingPayload<P> {
676    type Output = Result<BuildOutcome<P>, PayloadBuilderError>;
677
678    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
679        let res = ready!(self.payload.poll_unpin(cx));
680        Poll::Ready(res.map_err(Into::into).and_then(|res| res))
681    }
682}
683
684/// Static config for how to build a payload.
685#[derive(Clone, Debug)]
686pub struct PayloadConfig<Attributes, Header = alloy_consensus::Header> {
687    /// The parent header.
688    pub parent_header: Arc<SealedHeader<Header>>,
689    /// Requested attributes for the payload.
690    pub attributes: Attributes,
691    /// The payload id.
692    pub payload_id: PayloadId,
693}
694
695impl<Attributes, Header> PayloadConfig<Attributes, Header>
696where
697    Attributes: PayloadAttributes,
698{
699    /// Create new payload config.
700    pub const fn new(
701        parent_header: Arc<SealedHeader<Header>>,
702        attributes: Attributes,
703        payload_id: PayloadId,
704    ) -> Self {
705        Self { parent_header, attributes, payload_id }
706    }
707
708    /// Returns the payload id.
709    pub const fn payload_id(&self) -> PayloadId {
710        self.payload_id
711    }
712}
713
714/// The possible outcomes of a payload building attempt.
715#[derive(Debug)]
716pub enum BuildOutcome<Payload> {
717    /// Successfully built a better block.
718    Better {
719        /// The new payload that was built.
720        payload: Payload,
721        /// The cached reads that were used to build the payload.
722        cached_reads: CachedReads,
723    },
724    /// Aborted payload building because resulted in worse block wrt. fees.
725    Aborted {
726        /// The total fees associated with the attempted payload.
727        fees: U256,
728        /// The cached reads that were used to build the payload.
729        cached_reads: CachedReads,
730    },
731    /// Build job was cancelled
732    Cancelled,
733
734    /// The payload is final and no further building should occur
735    Freeze(Payload),
736}
737
738impl<Payload> BuildOutcome<Payload> {
739    /// Consumes the type and returns the payload if the outcome is `Better` or `Freeze`.
740    pub fn into_payload(self) -> Option<Payload> {
741        match self {
742            Self::Better { payload, .. } | Self::Freeze(payload) => Some(payload),
743            _ => None,
744        }
745    }
746
747    /// Consumes the type and returns the payload if the outcome is `Better` or `Freeze`.
748    pub const fn payload(&self) -> Option<&Payload> {
749        match self {
750            Self::Better { payload, .. } | Self::Freeze(payload) => Some(payload),
751            _ => None,
752        }
753    }
754
755    /// Returns true if the outcome is `Better`.
756    pub const fn is_better(&self) -> bool {
757        matches!(self, Self::Better { .. })
758    }
759
760    /// Returns true if the outcome is `Freeze`.
761    pub const fn is_frozen(&self) -> bool {
762        matches!(self, Self::Freeze { .. })
763    }
764
765    /// Returns true if the outcome is `Aborted`.
766    pub const fn is_aborted(&self) -> bool {
767        matches!(self, Self::Aborted { .. })
768    }
769
770    /// Returns true if the outcome is `Cancelled`.
771    pub const fn is_cancelled(&self) -> bool {
772        matches!(self, Self::Cancelled)
773    }
774
775    /// Applies a fn on the current payload.
776    pub fn map_payload<F, P>(self, f: F) -> BuildOutcome<P>
777    where
778        F: FnOnce(Payload) -> P,
779    {
780        match self {
781            Self::Better { payload, cached_reads } => {
782                BuildOutcome::Better { payload: f(payload), cached_reads }
783            }
784            Self::Aborted { fees, cached_reads } => BuildOutcome::Aborted { fees, cached_reads },
785            Self::Cancelled => BuildOutcome::Cancelled,
786            Self::Freeze(payload) => BuildOutcome::Freeze(f(payload)),
787        }
788    }
789}
790
791/// The possible outcomes of a payload building attempt without reused [`CachedReads`]
792#[derive(Debug)]
793pub enum BuildOutcomeKind<Payload> {
794    /// Successfully built a better block.
795    Better {
796        /// The new payload that was built.
797        payload: Payload,
798    },
799    /// Aborted payload building because resulted in worse block wrt. fees.
800    Aborted {
801        /// The total fees associated with the attempted payload.
802        fees: U256,
803    },
804    /// Build job was cancelled
805    Cancelled,
806    /// The payload is final and no further building should occur
807    Freeze(Payload),
808}
809
810impl<Payload> BuildOutcomeKind<Payload> {
811    /// Attaches the [`CachedReads`] to the outcome.
812    pub fn with_cached_reads(self, cached_reads: CachedReads) -> BuildOutcome<Payload> {
813        match self {
814            Self::Better { payload } => BuildOutcome::Better { payload, cached_reads },
815            Self::Aborted { fees } => BuildOutcome::Aborted { fees, cached_reads },
816            Self::Cancelled => BuildOutcome::Cancelled,
817            Self::Freeze(payload) => BuildOutcome::Freeze(payload),
818        }
819    }
820}
821
822/// A collection of arguments used for building payloads.
823///
824/// This struct encapsulates the essential components and configuration required for the payload
825/// building process. It holds references to the Ethereum client, transaction pool, cached reads,
826/// payload configuration, cancellation status, and the best payload achieved so far.
827#[derive(Debug)]
828pub struct BuildArguments<Attributes, Payload: BuiltPayload> {
829    /// Previously cached disk reads
830    pub cached_reads: CachedReads,
831    /// Optional execution cache shared with the engine.
832    pub execution_cache: Option<SavedCache>,
833    /// Optional state root task handle, shared with the engine.
834    ///
835    /// The preserved trie is shared with the engine, so a concurrent `newPayload` will
836    /// block until this task completes. The trie is anchored at the built block's state
837    /// root, so if the next `newPayload` is not on top of that block, the trie cache is
838    /// invalidated and cleared.
839    pub trie_handle: Option<StateRootHandle>,
840    /// How to configure the payload.
841    pub config: PayloadConfig<Attributes, HeaderTy<Payload::Primitives>>,
842    /// A marker that can be used to cancel the job.
843    pub cancel: CancelOnDrop,
844    /// The best payload achieved so far.
845    pub best_payload: Option<Payload>,
846}
847
848impl<Attributes, Payload: BuiltPayload> BuildArguments<Attributes, Payload> {
849    /// Create new build arguments.
850    pub const fn new(
851        cached_reads: CachedReads,
852        execution_cache: Option<SavedCache>,
853        trie_handle: Option<StateRootHandle>,
854        config: PayloadConfig<Attributes, HeaderTy<Payload::Primitives>>,
855        cancel: CancelOnDrop,
856        best_payload: Option<Payload>,
857    ) -> Self {
858        Self { cached_reads, execution_cache, trie_handle, config, cancel, best_payload }
859    }
860}
861
862/// A trait for building payloads that encapsulate Ethereum transactions.
863///
864/// This trait provides the `try_build` method to construct a transaction payload
865/// using `BuildArguments`. It returns a `Result` indicating success or a
866/// `PayloadBuilderError` if building fails.
867///
868/// Generic parameters `Pool` and `Client` represent the transaction pool and
869/// Ethereum client types.
870pub trait PayloadBuilder: Send + Sync + Clone {
871    /// The payload attributes type to accept for building.
872    type Attributes: PayloadAttributes;
873    /// The type of the built payload.
874    type BuiltPayload: BuiltPayload;
875
876    /// Tries to build a transaction payload using provided arguments.
877    ///
878    /// Constructs a transaction payload based on the given arguments,
879    /// returning a `Result` indicating success or an error if building fails.
880    ///
881    /// # Arguments
882    ///
883    /// - `args`: Build arguments containing necessary components.
884    ///
885    /// # Returns
886    ///
887    /// A `Result` indicating the build outcome or an error.
888    fn try_build(
889        &self,
890        args: BuildArguments<Self::Attributes, Self::BuiltPayload>,
891    ) -> Result<BuildOutcome<Self::BuiltPayload>, PayloadBuilderError>;
892
893    /// Invoked when the payload job is being resolved and there is no payload yet.
894    ///
895    /// This can happen if the CL requests a payload before the first payload has been built.
896    fn on_missing_payload(
897        &self,
898        _args: BuildArguments<Self::Attributes, Self::BuiltPayload>,
899    ) -> MissingPayloadBehaviour<Self::BuiltPayload> {
900        MissingPayloadBehaviour::RaceEmptyPayload
901    }
902
903    /// Builds an empty payload without any transaction.
904    fn build_empty_payload(
905        &self,
906        config: PayloadConfig<Self::Attributes, HeaderForPayload<Self::BuiltPayload>>,
907    ) -> Result<Self::BuiltPayload, PayloadBuilderError>;
908}
909
910/// Tells the payload builder how to react to payload request if there's no payload available yet.
911///
912/// This situation can occur if the CL requests a payload before the first payload has been built.
913#[derive(Default)]
914pub enum MissingPayloadBehaviour<Payload> {
915    /// Await the regular scheduled payload process.
916    AwaitInProgress,
917    /// Race the in progress payload process with an empty payload.
918    #[default]
919    RaceEmptyPayload,
920    /// Race the in progress payload process with this job.
921    RacePayload(Box<dyn FnOnce() -> Result<Payload, PayloadBuilderError> + Send>),
922}
923
924impl<Payload> fmt::Debug for MissingPayloadBehaviour<Payload> {
925    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
926        match self {
927            Self::AwaitInProgress => write!(f, "AwaitInProgress"),
928            Self::RaceEmptyPayload => {
929                write!(f, "RaceEmptyPayload")
930            }
931            Self::RacePayload(_) => write!(f, "RacePayload"),
932        }
933    }
934}
935
936/// Checks if the new payload is better than the current best.
937///
938/// This compares the total fees of the blocks, higher is better.
939#[inline(always)]
940pub fn is_better_payload<T: BuiltPayload>(best_payload: Option<&T>, new_fees: U256) -> bool {
941    if let Some(best_payload) = best_payload {
942        new_fees > best_payload.fees()
943    } else {
944        true
945    }
946}
947
948/// Returns the duration until the given unix timestamp in seconds.
949///
950/// Returns `Duration::ZERO` if the given timestamp is in the past.
951fn duration_until(unix_timestamp_secs: u64) -> Duration {
952    let unix_now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default();
953    let timestamp = Duration::from_secs(unix_timestamp_secs);
954    timestamp.saturating_sub(unix_now)
955}