1use crate::{
7 metrics::PayloadBuilderServiceMetrics, traits::PayloadJobGenerator, KeepPayloadJobAlive,
8 PayloadJob,
9};
10use alloy_consensus::BlockHeader;
11use alloy_primitives::{BlockTimestamp, B256};
12use alloy_rpc_types::engine::PayloadId;
13use futures_util::{future::FutureExt, Stream, StreamExt};
14use reth_chain_state::CanonStateNotification;
15use reth_payload_builder_primitives::{Events, PayloadBuilderError, PayloadEvents};
16use reth_payload_primitives::{BuiltPayload, PayloadAttributes, PayloadKind, PayloadTypes};
17use reth_primitives_traits::{FastInstant as Instant, NodePrimitives};
18use std::{
19 fmt,
20 future::Future,
21 pin::Pin,
22 sync::Arc,
23 task::{Context, Poll},
24};
25use tokio::sync::{
26 broadcast, mpsc,
27 oneshot::{self, Receiver},
28 watch,
29};
30use tokio_stream::wrappers::UnboundedReceiverStream;
31use tracing::{debug, debug_span, info, trace, warn, Span};
32
33type PayloadFuture<P> = Pin<Box<dyn Future<Output = Result<P, PayloadBuilderError>> + Send>>;
34
35#[derive(Debug)]
40pub struct PayloadStore<T: PayloadTypes> {
41 inner: Arc<PayloadBuilderHandle<T>>,
42}
43
44impl<T> PayloadStore<T>
45where
46 T: PayloadTypes,
47{
48 pub async fn resolve_kind(
53 &self,
54 id: PayloadId,
55 kind: PayloadKind,
56 ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
57 self.inner.resolve_kind(id, kind).await
58 }
59
60 pub async fn resolve(
62 &self,
63 id: PayloadId,
64 ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
65 self.resolve_kind(id, PayloadKind::Earliest).await
66 }
67
68 pub async fn best_payload(
72 &self,
73 id: PayloadId,
74 ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
75 self.inner.best_payload(id).await
76 }
77
78 pub async fn payload_timestamp(
82 &self,
83 id: PayloadId,
84 ) -> Option<Result<u64, PayloadBuilderError>> {
85 self.inner.payload_timestamp(id).await
86 }
87
88 pub fn new(inner: PayloadBuilderHandle<T>) -> Self {
90 Self { inner: Arc::new(inner) }
91 }
92}
93
94impl<T> From<PayloadBuilderHandle<T>> for PayloadStore<T>
95where
96 T: PayloadTypes,
97{
98 fn from(inner: PayloadBuilderHandle<T>) -> Self {
99 Self::new(inner)
100 }
101}
102
103#[derive(Debug)]
107pub struct PayloadBuilderHandle<T: PayloadTypes> {
108 to_service: mpsc::UnboundedSender<PayloadServiceCommand<T>>,
110}
111
112impl<T: PayloadTypes> PayloadBuilderHandle<T> {
113 pub const fn new(to_service: mpsc::UnboundedSender<PayloadServiceCommand<T>>) -> Self {
118 Self { to_service }
119 }
120
121 pub fn send_new_payload(
125 &self,
126 parent: B256,
127 attr: T::PayloadAttributes,
128 ) -> Receiver<Result<PayloadId, PayloadBuilderError>> {
129 let (tx, rx) = oneshot::channel();
130 let job_span = debug_span!(parent: Span::current(), "payload_job");
131 let _ = self
132 .to_service
133 .send(PayloadServiceCommand::BuildNewPayload(parent, attr, job_span, tx));
134 rx
135 }
136
137 pub async fn best_payload(
140 &self,
141 id: PayloadId,
142 ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
143 let (tx, rx) = oneshot::channel();
144 self.to_service.send(PayloadServiceCommand::BestPayload(id, tx)).ok()?;
145 rx.await.ok()?
146 }
147
148 pub async fn resolve_kind(
150 &self,
151 id: PayloadId,
152 kind: PayloadKind,
153 ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
154 let (tx, rx) = oneshot::channel();
155 self.to_service.send(PayloadServiceCommand::Resolve(id, kind, tx)).ok()?;
156 match rx.await.transpose()? {
157 Ok(fut) => Some(fut.await),
158 Err(e) => Some(Err(e.into())),
159 }
160 }
161
162 pub async fn subscribe(&self) -> Result<PayloadEvents<T>, PayloadBuilderError> {
165 let (tx, rx) = oneshot::channel();
166 let _ = self.to_service.send(PayloadServiceCommand::Subscribe(tx));
167 Ok(PayloadEvents { receiver: rx.await? })
168 }
169
170 pub async fn payload_timestamp(
174 &self,
175 id: PayloadId,
176 ) -> Option<Result<u64, PayloadBuilderError>> {
177 let (tx, rx) = oneshot::channel();
178 self.to_service.send(PayloadServiceCommand::PayloadTimestamp(id, tx)).ok()?;
179 rx.await.ok()?
180 }
181}
182
183impl<T> Clone for PayloadBuilderHandle<T>
184where
185 T: PayloadTypes,
186{
187 fn clone(&self) -> Self {
188 Self { to_service: self.to_service.clone() }
189 }
190}
191
192#[derive(Debug)]
201#[must_use = "futures do nothing unless you `.await` or poll them"]
202pub struct PayloadBuilderService<Gen, St, T>
203where
204 T: PayloadTypes,
205 Gen: PayloadJobGenerator,
206 Gen::Job: PayloadJob<PayloadAttributes = T::PayloadAttributes>,
207{
208 generator: Gen,
210 payload_jobs: Vec<(Gen::Job, PayloadId, Span)>,
214 service_tx: mpsc::UnboundedSender<PayloadServiceCommand<T>>,
216 command_rx: UnboundedReceiverStream<PayloadServiceCommand<T>>,
218 metrics: PayloadBuilderServiceMetrics,
220 chain_events: St,
222 payload_events: broadcast::Sender<Events<T>>,
224 cached_payload_rx: watch::Receiver<Option<(PayloadId, BlockTimestamp, T::BuiltPayload)>>,
227 cached_payload_tx: watch::Sender<Option<(PayloadId, BlockTimestamp, T::BuiltPayload)>>,
229}
230
231const PAYLOAD_EVENTS_BUFFER_SIZE: usize = 20;
232
233impl<Gen, St, T> PayloadBuilderService<Gen, St, T>
236where
237 T: PayloadTypes,
238 Gen: PayloadJobGenerator,
239 Gen::Job: PayloadJob<PayloadAttributes = T::PayloadAttributes>,
240 <Gen::Job as PayloadJob>::BuiltPayload: Into<T::BuiltPayload>,
241{
242 pub fn new(generator: Gen, chain_events: St) -> (Self, PayloadBuilderHandle<T>) {
249 let (service_tx, command_rx) = mpsc::unbounded_channel();
250 let (payload_events, _) = broadcast::channel(PAYLOAD_EVENTS_BUFFER_SIZE);
251
252 let (cached_payload_tx, cached_payload_rx) = watch::channel(None);
253
254 let service = Self {
255 generator,
256 payload_jobs: Vec::new(),
257 service_tx,
258 command_rx: UnboundedReceiverStream::new(command_rx),
259 metrics: Default::default(),
260 chain_events,
261 payload_events,
262 cached_payload_rx,
263 cached_payload_tx,
264 };
265
266 let handle = service.handle();
267 (service, handle)
268 }
269
270 pub fn handle(&self) -> PayloadBuilderHandle<T> {
272 PayloadBuilderHandle::new(self.service_tx.clone())
273 }
274
275 pub fn payload_events_handle(&self) -> broadcast::Sender<Events<T>> {
278 self.payload_events.clone()
279 }
280
281 fn contains_payload(&self, id: PayloadId) -> bool {
283 self.payload_jobs.iter().any(|(_, job_id, _)| *job_id == id)
284 }
285
286 fn best_payload(&self, id: PayloadId) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
288 let res = self
289 .payload_jobs
290 .iter()
291 .find(|(_, job_id, _)| *job_id == id)
292 .map(|(j, _, _)| j.best_payload().map(|p| p.into()));
293 if let Some(Ok(ref best)) = res {
294 self.metrics.set_best_revenue(best.block().number(), f64::from(best.fees()));
295 }
296
297 res
298 }
299
300 fn resolve(
303 &mut self,
304 id: PayloadId,
305 kind: PayloadKind,
306 ) -> Option<PayloadFuture<T::BuiltPayload>> {
307 let start = Instant::now();
308 debug!(target: "payload_builder", %id, "resolving payload job");
309
310 if let Some((cached, _, payload)) = &*self.cached_payload_rx.borrow() &&
311 *cached == id
312 {
313 self.metrics.resolve_duration_seconds.record(start.elapsed());
314 return Some(Box::pin(core::future::ready(Ok(payload.clone()))));
315 }
316
317 let job = self.payload_jobs.iter().position(|(_, job_id, _)| *job_id == id)?;
318 let (fut, keep_alive) = self.payload_jobs[job].0.resolve_kind(kind);
319 let payload_timestamp = self.payload_jobs[job].0.payload_timestamp();
320
321 if keep_alive == KeepPayloadJobAlive::No {
322 let (_, id, _) = self.payload_jobs.swap_remove(job);
323 debug!(target: "payload_builder", %id, "terminated resolved job");
324 }
325
326 let resolved_metrics = self.metrics.clone();
329 let payload_events = self.payload_events.clone();
330 let cached_payload_tx = self.cached_payload_tx.clone();
331
332 let fut = async move {
333 let res = fut.await;
334 resolved_metrics.resolve_duration_seconds.record(start.elapsed());
335 if let Ok(payload) = &res {
336 if payload_events.receiver_count() > 0 {
337 payload_events.send(Events::BuiltPayload(payload.clone().into())).ok();
338 }
339
340 if let Ok(timestamp) = payload_timestamp {
341 let _ = cached_payload_tx.send(Some((id, timestamp, payload.clone().into())));
342 }
343
344 resolved_metrics
345 .set_resolved_revenue(payload.block().number(), f64::from(payload.fees()));
346 }
347 res.map(|p| p.into())
348 };
349
350 Some(Box::pin(fut))
351 }
352
353 fn payload_timestamp(&self, id: PayloadId) -> Option<Result<u64, PayloadBuilderError>> {
355 if let Some((cached_id, timestamp, _)) = *self.cached_payload_rx.borrow() &&
356 cached_id == id
357 {
358 return Some(Ok(timestamp));
359 }
360
361 let timestamp = self
362 .payload_jobs
363 .iter()
364 .find(|(_, job_id, _)| *job_id == id)
365 .map(|(j, _, _)| j.payload_timestamp());
366
367 if timestamp.is_none() {
368 trace!(target: "payload_builder", %id, "no matching payload job found to get timestamp for");
369 }
370
371 timestamp
372 }
373}
374
375impl<Gen, St, T, N> Future for PayloadBuilderService<Gen, St, T>
376where
377 T: PayloadTypes,
378 N: NodePrimitives,
379 Gen: PayloadJobGenerator + Unpin + 'static,
380 <Gen as PayloadJobGenerator>::Job: Unpin + 'static,
381 St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
382 Gen::Job: PayloadJob<PayloadAttributes = T::PayloadAttributes>,
383 <Gen::Job as PayloadJob>::BuiltPayload: Into<T::BuiltPayload>,
384{
385 type Output = ();
386
387 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
388 let this = self.get_mut();
389 loop {
390 while let Poll::Ready(Some(new_head)) = this.chain_events.poll_next_unpin(cx) {
392 this.generator.on_new_state(new_head);
393 }
394
395 for idx in (0..this.payload_jobs.len()).rev() {
399 let (mut job, id, job_span) = this.payload_jobs.swap_remove(idx);
400
401 let poll_result = {
402 let _entered = job_span.enter();
403 job.poll_unpin(cx)
404 };
405
406 match poll_result {
407 Poll::Ready(Ok(_)) => {
408 this.metrics.set_active_jobs(this.payload_jobs.len());
409 trace!(target: "payload_builder", %id, "payload job finished");
410 }
411 Poll::Ready(Err(err)) => {
412 warn!(target: "payload_builder",%err, ?id, "Payload builder job failed; resolving payload");
413 this.metrics.inc_failed_jobs();
414 this.metrics.set_active_jobs(this.payload_jobs.len());
415 }
416 Poll::Pending => {
417 this.payload_jobs.push((job, id, job_span));
418 }
419 }
420 }
421
422 let mut new_job = false;
424
425 while let Poll::Ready(Some(cmd)) = this.command_rx.poll_next_unpin(cx) {
427 match cmd {
428 PayloadServiceCommand::BuildNewPayload(parent, attr, job_span, tx) => {
429 let id = attr.payload_id(&parent);
430 let mut res = Ok(id);
431
432 if this.contains_payload(id) {
433 debug!(target: "payload_builder", %id, %parent, "Payload job already in progress, ignoring.");
434 } else {
435 let start = Instant::now();
436 let job_result = {
437 let _entered = job_span.enter();
438 this.generator.new_payload_job(parent, attr.clone(), id)
439 };
440
441 match job_result {
442 Ok(job) => {
443 this.metrics.new_job_duration_seconds.record(start.elapsed());
444 info!(target: "payload_builder", %id, %parent, "New payload job created");
445 this.metrics.inc_initiated_jobs();
446 new_job = true;
447 this.payload_jobs.push((job, id, job_span));
448 this.payload_events.send(Events::Attributes(attr)).ok();
449
450 if this
454 .cached_payload_rx
455 .borrow()
456 .as_ref()
457 .is_some_and(|(cached_id, _, _)| *cached_id == id)
458 {
459 trace!(target: "payload_builder", %id, "clearing stale cached payload for reused payload id");
460 let _ = this.cached_payload_tx.send(None);
461 }
462 }
463 Err(err) => {
464 this.metrics.new_job_duration_seconds.record(start.elapsed());
465 this.metrics.inc_failed_jobs();
466 warn!(target: "payload_builder", %err, %id, "Failed to create payload builder job");
467 res = Err(err);
468 }
469 }
470 }
471
472 let _ = tx.send(res);
473 }
474 PayloadServiceCommand::BestPayload(id, tx) => {
475 let _ = tx.send(this.best_payload(id));
476 }
477 PayloadServiceCommand::PayloadTimestamp(id, tx) => {
478 let timestamp = this.payload_timestamp(id);
479 let _ = tx.send(timestamp);
480 }
481 PayloadServiceCommand::Resolve(id, strategy, tx) => {
482 let _ = tx.send(this.resolve(id, strategy));
483 }
484 PayloadServiceCommand::Subscribe(tx) => {
485 let new_rx = this.payload_events.subscribe();
486 let _ = tx.send(new_rx);
487 }
488 }
489 }
490
491 if !new_job {
492 return Poll::Pending
493 }
494 }
495 }
496}
497
498pub enum PayloadServiceCommand<T: PayloadTypes> {
500 BuildNewPayload(
505 B256,
506 T::PayloadAttributes,
507 Span,
508 oneshot::Sender<Result<PayloadId, PayloadBuilderError>>,
509 ),
510 BestPayload(PayloadId, oneshot::Sender<Option<Result<T::BuiltPayload, PayloadBuilderError>>>),
512 PayloadTimestamp(PayloadId, oneshot::Sender<Option<Result<u64, PayloadBuilderError>>>),
514 Resolve(
516 PayloadId,
517 PayloadKind,
518 oneshot::Sender<Option<PayloadFuture<T::BuiltPayload>>>,
519 ),
520 Subscribe(oneshot::Sender<broadcast::Receiver<Events<T>>>),
522}
523
524impl<T> fmt::Debug for PayloadServiceCommand<T>
525where
526 T: PayloadTypes,
527{
528 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
529 match self {
530 Self::BuildNewPayload(f0, f1, _, f2) => {
531 f.debug_tuple("BuildNewPayload").field(&f0).field(&f1).field(&f2).finish()
532 }
533 Self::BestPayload(f0, f1) => {
534 f.debug_tuple("BestPayload").field(&f0).field(&f1).finish()
535 }
536 Self::PayloadTimestamp(f0, f1) => {
537 f.debug_tuple("PayloadTimestamp").field(&f0).field(&f1).finish()
538 }
539 Self::Resolve(f0, f1, _f2) => f.debug_tuple("Resolve").field(&f0).field(&f1).finish(),
540 Self::Subscribe(f0) => f.debug_tuple("Subscribe").field(&f0).finish(),
541 }
542 }
543}