1use crate::{
7 metrics::PayloadBuilderServiceMetrics, traits::PayloadJobGenerator, KeepPayloadJobAlive,
8 PayloadJob,
9};
10use alloy_consensus::BlockHeader;
11use alloy_primitives::{BlockTimestamp, B256};
12use alloy_rpc_types::engine::PayloadId;
13use futures_util::{future::FutureExt, Stream, StreamExt};
14use reth_chain_state::CanonStateNotification;
15use reth_execution_cache::SavedCache;
16use reth_payload_builder_primitives::{Events, PayloadBuilderError, PayloadEvents};
17use reth_payload_primitives::{BuiltPayload, PayloadAttributes, PayloadKind, PayloadTypes};
18use reth_primitives_traits::{FastInstant as Instant, NodePrimitives};
19use reth_trie_parallel::state_root_task::StateRootHandle;
20use std::{
21 future::Future,
22 pin::Pin,
23 sync::Arc,
24 task::{Context, Poll},
25};
26use tokio::sync::{
27 broadcast, mpsc,
28 oneshot::{self, Receiver},
29 watch,
30};
31use tokio_stream::wrappers::UnboundedReceiverStream;
32use tracing::{debug, debug_span, info, trace, warn, Span};
33
34type PayloadFuture<P> = Pin<Box<dyn Future<Output = Result<P, PayloadBuilderError>> + Send>>;
35
36#[derive(Debug)]
41pub struct PayloadStore<T: PayloadTypes> {
42 inner: Arc<PayloadBuilderHandle<T>>,
43}
44
45impl<T> PayloadStore<T>
46where
47 T: PayloadTypes,
48{
49 pub async fn resolve_kind(
54 &self,
55 id: PayloadId,
56 kind: PayloadKind,
57 ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
58 self.inner.resolve_kind(id, kind).await
59 }
60
61 pub async fn resolve(
63 &self,
64 id: PayloadId,
65 ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
66 self.resolve_kind(id, PayloadKind::Earliest).await
67 }
68
69 pub async fn best_payload(
73 &self,
74 id: PayloadId,
75 ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
76 self.inner.best_payload(id).await
77 }
78
79 pub async fn payload_timestamp(
83 &self,
84 id: PayloadId,
85 ) -> Option<Result<u64, PayloadBuilderError>> {
86 self.inner.payload_timestamp(id).await
87 }
88
89 pub fn new(inner: PayloadBuilderHandle<T>) -> Self {
91 Self { inner: Arc::new(inner) }
92 }
93}
94
95impl<T> From<PayloadBuilderHandle<T>> for PayloadStore<T>
96where
97 T: PayloadTypes,
98{
99 fn from(inner: PayloadBuilderHandle<T>) -> Self {
100 Self::new(inner)
101 }
102}
103
104#[derive(Debug)]
108pub struct PayloadBuilderHandle<T: PayloadTypes> {
109 to_service: mpsc::UnboundedSender<PayloadServiceCommand<T>>,
111}
112
113impl<T: PayloadTypes> PayloadBuilderHandle<T> {
114 pub const fn new(to_service: mpsc::UnboundedSender<PayloadServiceCommand<T>>) -> Self {
119 Self { to_service }
120 }
121
122 pub fn send_new_payload(
126 &self,
127 input: BuildNewPayload<T::PayloadAttributes>,
128 ) -> Receiver<Result<PayloadId, PayloadBuilderError>> {
129 let (tx, rx) = oneshot::channel();
130 let span = debug_span!(parent: Span::current(), "payload_job");
131 let _ =
132 self.to_service.send(PayloadServiceCommand::BuildNewPayload(input.into(), span, tx));
133 rx
134 }
135
136 pub async fn best_payload(
139 &self,
140 id: PayloadId,
141 ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
142 let (tx, rx) = oneshot::channel();
143 self.to_service.send(PayloadServiceCommand::BestPayload(id, tx)).ok()?;
144 rx.await.ok()?
145 }
146
147 pub async fn resolve_kind(
149 &self,
150 id: PayloadId,
151 kind: PayloadKind,
152 ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
153 let (tx, rx) = oneshot::channel();
154 self.to_service.send(PayloadServiceCommand::Resolve(id, kind, tx)).ok()?;
155 match rx.await.transpose()? {
156 Ok(fut) => Some(fut.await),
157 Err(e) => Some(Err(e.into())),
158 }
159 }
160
161 pub async fn resolve_kind_fut(
163 &self,
164 id: PayloadId,
165 kind: PayloadKind,
166 ) -> Result<Option<PayloadFuture<T::BuiltPayload>>, PayloadBuilderError> {
167 let (tx, rx) = oneshot::channel();
168 self.to_service.send(PayloadServiceCommand::Resolve(id, kind, tx))?;
169 rx.await.map_err(Into::into)
170 }
171
172 pub async fn subscribe(&self) -> Result<PayloadEvents<T>, PayloadBuilderError> {
175 let (tx, rx) = oneshot::channel();
176 let _ = self.to_service.send(PayloadServiceCommand::Subscribe(tx));
177 Ok(PayloadEvents { receiver: rx.await? })
178 }
179
180 pub async fn payload_timestamp(
184 &self,
185 id: PayloadId,
186 ) -> Option<Result<u64, PayloadBuilderError>> {
187 let (tx, rx) = oneshot::channel();
188 self.to_service.send(PayloadServiceCommand::PayloadTimestamp(id, tx)).ok()?;
189 rx.await.ok()?
190 }
191}
192
193impl<T> Clone for PayloadBuilderHandle<T>
194where
195 T: PayloadTypes,
196{
197 fn clone(&self) -> Self {
198 Self { to_service: self.to_service.clone() }
199 }
200}
201
202#[derive(Debug)]
211#[must_use = "futures do nothing unless you `.await` or poll them"]
212pub struct PayloadBuilderService<Gen, St, T>
213where
214 T: PayloadTypes,
215 Gen: PayloadJobGenerator,
216 Gen::Job: PayloadJob<PayloadAttributes = T::PayloadAttributes>,
217{
218 generator: Gen,
220 payload_jobs: Vec<(Gen::Job, PayloadId, Span)>,
224 service_tx: mpsc::UnboundedSender<PayloadServiceCommand<T>>,
226 command_rx: UnboundedReceiverStream<PayloadServiceCommand<T>>,
228 metrics: PayloadBuilderServiceMetrics,
230 chain_events: St,
232 payload_events: broadcast::Sender<Events<T>>,
234 cached_payload_rx: watch::Receiver<Option<(PayloadId, BlockTimestamp, T::BuiltPayload)>>,
237 cached_payload_tx: watch::Sender<Option<(PayloadId, BlockTimestamp, T::BuiltPayload)>>,
239}
240
241const PAYLOAD_EVENTS_BUFFER_SIZE: usize = 20;
242
243impl<Gen, St, T> PayloadBuilderService<Gen, St, T>
246where
247 T: PayloadTypes,
248 Gen: PayloadJobGenerator,
249 Gen::Job: PayloadJob<PayloadAttributes = T::PayloadAttributes>,
250 <Gen::Job as PayloadJob>::BuiltPayload: Into<T::BuiltPayload>,
251{
252 pub fn new(generator: Gen, chain_events: St) -> (Self, PayloadBuilderHandle<T>) {
259 let (service_tx, command_rx) = mpsc::unbounded_channel();
260 let (payload_events, _) = broadcast::channel(PAYLOAD_EVENTS_BUFFER_SIZE);
261
262 let (cached_payload_tx, cached_payload_rx) = watch::channel(None);
263
264 let service = Self {
265 generator,
266 payload_jobs: Vec::new(),
267 service_tx,
268 command_rx: UnboundedReceiverStream::new(command_rx),
269 metrics: Default::default(),
270 chain_events,
271 payload_events,
272 cached_payload_rx,
273 cached_payload_tx,
274 };
275
276 let handle = service.handle();
277 (service, handle)
278 }
279
280 pub fn handle(&self) -> PayloadBuilderHandle<T> {
282 PayloadBuilderHandle::new(self.service_tx.clone())
283 }
284
285 pub fn payload_events_handle(&self) -> broadcast::Sender<Events<T>> {
288 self.payload_events.clone()
289 }
290
291 fn contains_payload(&self, id: PayloadId) -> bool {
293 self.payload_jobs.iter().any(|(_, job_id, _)| *job_id == id)
294 }
295
296 fn best_payload(&self, id: PayloadId) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
298 let res = self
299 .payload_jobs
300 .iter()
301 .find(|(_, job_id, _)| *job_id == id)
302 .map(|(j, _, _)| j.best_payload().map(|p| p.into()));
303 if let Some(Ok(ref best)) = res {
304 self.metrics.set_best_revenue(best.block().number(), f64::from(best.fees()));
305 }
306
307 res
308 }
309
310 fn resolve(
313 &mut self,
314 id: PayloadId,
315 kind: PayloadKind,
316 ) -> Option<PayloadFuture<T::BuiltPayload>> {
317 let start = Instant::now();
318 debug!(target: "payload_builder", %id, "resolving payload job");
319
320 if let Some((cached, _, payload)) = &*self.cached_payload_rx.borrow() &&
321 *cached == id
322 {
323 self.metrics.resolve_duration_seconds.record(start.elapsed());
324 return Some(Box::pin(core::future::ready(Ok(payload.clone()))));
325 }
326
327 let job = self.payload_jobs.iter().position(|(_, job_id, _)| *job_id == id)?;
328 let (fut, keep_alive) = self.payload_jobs[job].0.resolve_kind(kind);
329 let payload_timestamp = self.payload_jobs[job].0.payload_timestamp();
330
331 if keep_alive == KeepPayloadJobAlive::No {
332 let (_, id, _) = self.payload_jobs.swap_remove(job);
333 debug!(target: "payload_builder", %id, "terminated resolved job");
334 }
335
336 let resolved_metrics = self.metrics.clone();
339 let payload_events = self.payload_events.clone();
340 let cached_payload_tx = self.cached_payload_tx.clone();
341
342 let fut = async move {
343 let res = fut.await;
344 resolved_metrics.resolve_duration_seconds.record(start.elapsed());
345 if let Ok(payload) = &res {
346 if payload_events.receiver_count() > 0 {
347 payload_events.send(Events::BuiltPayload(payload.clone().into())).ok();
348 }
349
350 if let Ok(timestamp) = payload_timestamp {
351 let _ = cached_payload_tx.send(Some((id, timestamp, payload.clone().into())));
352 }
353
354 resolved_metrics
355 .set_resolved_revenue(payload.block().number(), f64::from(payload.fees()));
356 }
357 res.map(|p| p.into())
358 };
359
360 Some(Box::pin(fut))
361 }
362
363 fn payload_timestamp(&self, id: PayloadId) -> Option<Result<u64, PayloadBuilderError>> {
365 if let Some((cached_id, timestamp, _)) = *self.cached_payload_rx.borrow() &&
366 cached_id == id
367 {
368 return Some(Ok(timestamp));
369 }
370
371 let timestamp = self
372 .payload_jobs
373 .iter()
374 .find(|(_, job_id, _)| *job_id == id)
375 .map(|(j, _, _)| j.payload_timestamp());
376
377 if timestamp.is_none() {
378 trace!(target: "payload_builder", %id, "no matching payload job found to get timestamp for");
379 }
380
381 timestamp
382 }
383}
384
385impl<Gen, St, T, N> Future for PayloadBuilderService<Gen, St, T>
386where
387 T: PayloadTypes,
388 N: NodePrimitives,
389 Gen: PayloadJobGenerator + Unpin + 'static,
390 <Gen as PayloadJobGenerator>::Job: Unpin + 'static,
391 St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
392 Gen::Job: PayloadJob<PayloadAttributes = T::PayloadAttributes>,
393 <Gen::Job as PayloadJob>::BuiltPayload: Into<T::BuiltPayload>,
394{
395 type Output = ();
396
397 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
398 let this = self.get_mut();
399 loop {
400 while let Poll::Ready(Some(new_head)) = this.chain_events.poll_next_unpin(cx) {
402 this.generator.on_new_state(new_head);
403 }
404
405 for idx in (0..this.payload_jobs.len()).rev() {
409 let (mut job, id, job_span) = this.payload_jobs.swap_remove(idx);
410
411 let poll_result = {
412 let _entered = job_span.enter();
413 job.poll_unpin(cx)
414 };
415
416 match poll_result {
417 Poll::Ready(Ok(_)) => {
418 this.metrics.set_active_jobs(this.payload_jobs.len());
419 trace!(target: "payload_builder", %id, "payload job finished");
420 }
421 Poll::Ready(Err(err)) => {
422 warn!(target: "payload_builder",%err, ?id, "Payload builder job failed; resolving payload");
423 this.metrics.inc_failed_jobs();
424 this.metrics.set_active_jobs(this.payload_jobs.len());
425 }
426 Poll::Pending => {
427 this.payload_jobs.push((job, id, job_span));
428 }
429 }
430 }
431
432 let mut new_job = false;
434
435 while let Poll::Ready(Some(cmd)) = this.command_rx.poll_next_unpin(cx) {
437 match cmd {
438 PayloadServiceCommand::BuildNewPayload(input, job_span, tx) => {
439 let id = input.payload_id();
440 let mut res = Ok(id);
441 let parent = input.parent_hash;
442
443 if this.contains_payload(id) {
444 debug!(target: "payload_builder", %id, %parent, "Payload job already in progress, ignoring.");
445 } else {
446 let start = Instant::now();
447 let attributes = input.attributes.clone();
448 let job_result = {
449 let _entered = job_span.enter();
450 this.generator.new_payload_job(*input, id)
451 };
452
453 match job_result {
454 Ok(job) => {
455 this.metrics.new_job_duration_seconds.record(start.elapsed());
456 info!(target: "payload_builder", %id, %parent, "New payload job created");
457 this.metrics.inc_initiated_jobs();
458 new_job = true;
459 this.payload_jobs.push((job, id, job_span));
460 this.payload_events.send(Events::Attributes(attributes)).ok();
461
462 if this
466 .cached_payload_rx
467 .borrow()
468 .as_ref()
469 .is_some_and(|(cached_id, _, _)| *cached_id == id)
470 {
471 trace!(target: "payload_builder", %id, "clearing stale cached payload for reused payload id");
472 let _ = this.cached_payload_tx.send(None);
473 }
474 }
475 Err(err) => {
476 this.metrics.new_job_duration_seconds.record(start.elapsed());
477 this.metrics.inc_failed_jobs();
478 warn!(target: "payload_builder", %err, %id, "Failed to create payload builder job");
479 res = Err(err);
480 }
481 }
482 }
483
484 let _ = tx.send(res);
485 }
486 PayloadServiceCommand::BestPayload(id, tx) => {
487 let _ = tx.send(this.best_payload(id));
488 }
489 PayloadServiceCommand::PayloadTimestamp(id, tx) => {
490 let timestamp = this.payload_timestamp(id);
491 let _ = tx.send(timestamp);
492 }
493 PayloadServiceCommand::Resolve(id, strategy, tx) => {
494 let _ = tx.send(this.resolve(id, strategy));
495 }
496 PayloadServiceCommand::Subscribe(tx) => {
497 let new_rx = this.payload_events.subscribe();
498 let _ = tx.send(new_rx);
499 }
500 }
501 }
502
503 if !new_job {
504 return Poll::Pending
505 }
506 }
507 }
508}
509
510#[derive(derive_more::Debug)]
512pub enum PayloadServiceCommand<T: PayloadTypes> {
513 BuildNewPayload(
518 Box<BuildNewPayload<T::PayloadAttributes>>,
519 Span,
520 oneshot::Sender<Result<PayloadId, PayloadBuilderError>>,
521 ),
522 BestPayload(PayloadId, oneshot::Sender<Option<Result<T::BuiltPayload, PayloadBuilderError>>>),
524 PayloadTimestamp(PayloadId, oneshot::Sender<Option<Result<u64, PayloadBuilderError>>>),
526 Resolve(
528 PayloadId,
529 PayloadKind,
530 #[debug(skip)] oneshot::Sender<Option<PayloadFuture<T::BuiltPayload>>>,
531 ),
532 Subscribe(oneshot::Sender<broadcast::Receiver<Events<T>>>),
534}
535
536#[derive(Debug)]
538pub struct BuildNewPayload<T> {
539 pub attributes: T,
541 pub parent_hash: B256,
543 pub cache: Option<SavedCache>,
547 pub trie_handle: Option<StateRootHandle>,
549}
550
551impl<T: PayloadAttributes> BuildNewPayload<T> {
552 pub fn payload_id(&self) -> PayloadId {
554 self.attributes.payload_id(&self.parent_hash)
555 }
556}