1use crate::{
7 metrics::PayloadBuilderServiceMetrics, traits::PayloadJobGenerator, KeepPayloadJobAlive,
8 PayloadJob,
9};
10use alloy_consensus::BlockHeader;
11use alloy_primitives::{BlockTimestamp, B256};
12use alloy_rpc_types::engine::PayloadId;
13use futures_util::{future::FutureExt, Stream, StreamExt};
14use reth_chain_state::CanonStateNotification;
15use reth_execution_cache::SavedCache;
16use reth_payload_builder_primitives::{Events, PayloadBuilderError, PayloadEvents};
17use reth_payload_primitives::{BuiltPayload, PayloadAttributes, PayloadKind, PayloadTypes};
18use reth_primitives_traits::{FastInstant as Instant, NodePrimitives};
19use reth_trie_parallel::state_root_task::StateRootHandle;
20use std::{
21 future::Future,
22 pin::Pin,
23 sync::Arc,
24 task::{Context, Poll},
25};
26use tokio::sync::{
27 broadcast, mpsc,
28 oneshot::{self, Receiver},
29 watch,
30};
31use tokio_stream::wrappers::UnboundedReceiverStream;
32use tracing::{debug, debug_span, info, trace, warn, Span};
33
34type PayloadFuture<P> = Pin<Box<dyn Future<Output = Result<P, PayloadBuilderError>> + Send>>;
35type PayloadJobEntry<Job> = (Job, PayloadId, Span);
36type ResolvePayloadResult<P, Job> = (Option<PayloadFuture<P>>, Option<PayloadJobEntry<Job>>);
37
38#[derive(Debug)]
43pub struct PayloadStore<T: PayloadTypes> {
44 inner: Arc<PayloadBuilderHandle<T>>,
45}
46
47impl<T> PayloadStore<T>
48where
49 T: PayloadTypes,
50{
51 pub fn resolve_kind(
56 &self,
57 id: PayloadId,
58 kind: PayloadKind,
59 ) -> impl Future<Output = Option<Result<T::BuiltPayload, PayloadBuilderError>>> {
60 self.inner.resolve_kind(id, kind)
61 }
62
63 pub async fn resolve(
65 &self,
66 id: PayloadId,
67 ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
68 self.resolve_kind(id, PayloadKind::Earliest).await
69 }
70
71 pub async fn best_payload(
75 &self,
76 id: PayloadId,
77 ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
78 self.inner.best_payload(id).await
79 }
80
81 pub async fn payload_timestamp(
85 &self,
86 id: PayloadId,
87 ) -> Option<Result<u64, PayloadBuilderError>> {
88 self.inner.payload_timestamp(id).await
89 }
90
91 pub fn new(inner: PayloadBuilderHandle<T>) -> Self {
93 Self { inner: Arc::new(inner) }
94 }
95}
96
97impl<T> From<PayloadBuilderHandle<T>> for PayloadStore<T>
98where
99 T: PayloadTypes,
100{
101 fn from(inner: PayloadBuilderHandle<T>) -> Self {
102 Self::new(inner)
103 }
104}
105
106#[derive(Debug)]
110pub struct PayloadBuilderHandle<T: PayloadTypes> {
111 to_service: mpsc::UnboundedSender<PayloadServiceCommand<T>>,
113}
114
115impl<T: PayloadTypes> PayloadBuilderHandle<T> {
116 pub const fn new(to_service: mpsc::UnboundedSender<PayloadServiceCommand<T>>) -> Self {
121 Self { to_service }
122 }
123
124 pub fn send_new_payload(
128 &self,
129 input: BuildNewPayload<T::PayloadAttributes>,
130 ) -> Receiver<Result<PayloadId, PayloadBuilderError>> {
131 let (tx, rx) = oneshot::channel();
132 let span = debug_span!(parent: Span::current(), "payload_job");
133 let _ =
134 self.to_service.send(PayloadServiceCommand::BuildNewPayload(input.into(), span, tx));
135 rx
136 }
137
138 pub async fn best_payload(
141 &self,
142 id: PayloadId,
143 ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
144 let (tx, rx) = oneshot::channel();
145 self.to_service.send(PayloadServiceCommand::BestPayload(id, tx)).ok()?;
146 rx.await.ok()?
147 }
148
149 pub fn resolve_kind(
157 &self,
158 id: PayloadId,
159 kind: PayloadKind,
160 ) -> impl Future<Output = Option<Result<T::BuiltPayload, PayloadBuilderError>>> {
161 let (tx, rx) = oneshot::channel();
162 let sent = self.to_service.send(PayloadServiceCommand::Resolve(id, kind, tx)).is_ok();
163 async move {
164 if !sent {
165 return None
166 }
167
168 match rx.await.transpose()? {
169 Ok(fut) => Some(fut.await),
170 Err(e) => Some(Err(e.into())),
171 }
172 }
173 }
174
175 pub async fn subscribe(&self) -> Result<PayloadEvents<T>, PayloadBuilderError> {
178 let (tx, rx) = oneshot::channel();
179 let _ = self.to_service.send(PayloadServiceCommand::Subscribe(tx));
180 Ok(PayloadEvents { receiver: rx.await? })
181 }
182
183 pub async fn payload_timestamp(
187 &self,
188 id: PayloadId,
189 ) -> Option<Result<u64, PayloadBuilderError>> {
190 let (tx, rx) = oneshot::channel();
191 self.to_service.send(PayloadServiceCommand::PayloadTimestamp(id, tx)).ok()?;
192 rx.await.ok()?
193 }
194}
195
196impl<T> Clone for PayloadBuilderHandle<T>
197where
198 T: PayloadTypes,
199{
200 fn clone(&self) -> Self {
201 Self { to_service: self.to_service.clone() }
202 }
203}
204
205#[derive(Debug)]
214#[must_use = "futures do nothing unless you `.await` or poll them"]
215pub struct PayloadBuilderService<Gen, St, T>
216where
217 T: PayloadTypes,
218 Gen: PayloadJobGenerator,
219 Gen::Job: PayloadJob<PayloadAttributes = T::PayloadAttributes>,
220{
221 generator: Gen,
223 payload_jobs: Vec<(Gen::Job, PayloadId, Span)>,
227 service_tx: mpsc::UnboundedSender<PayloadServiceCommand<T>>,
229 command_rx: UnboundedReceiverStream<PayloadServiceCommand<T>>,
231 metrics: PayloadBuilderServiceMetrics,
233 chain_events: St,
235 payload_events: broadcast::Sender<Events<T>>,
237 cached_payload_rx: watch::Receiver<Option<(PayloadId, BlockTimestamp, T::BuiltPayload)>>,
240 cached_payload_tx: watch::Sender<Option<(PayloadId, BlockTimestamp, T::BuiltPayload)>>,
242}
243
244const PAYLOAD_EVENTS_BUFFER_SIZE: usize = 20;
245
246impl<Gen, St, T> PayloadBuilderService<Gen, St, T>
249where
250 T: PayloadTypes,
251 Gen: PayloadJobGenerator,
252 Gen::Job: PayloadJob<PayloadAttributes = T::PayloadAttributes>,
253 <Gen::Job as PayloadJob>::BuiltPayload: Into<T::BuiltPayload>,
254{
255 pub fn new(generator: Gen, chain_events: St) -> (Self, PayloadBuilderHandle<T>) {
262 let (service_tx, command_rx) = mpsc::unbounded_channel();
263 let (payload_events, _) = broadcast::channel(PAYLOAD_EVENTS_BUFFER_SIZE);
264
265 let (cached_payload_tx, cached_payload_rx) = watch::channel(None);
266
267 let service = Self {
268 generator,
269 payload_jobs: Vec::new(),
270 service_tx,
271 command_rx: UnboundedReceiverStream::new(command_rx),
272 metrics: Default::default(),
273 chain_events,
274 payload_events,
275 cached_payload_rx,
276 cached_payload_tx,
277 };
278
279 let handle = service.handle();
280 (service, handle)
281 }
282
283 pub fn handle(&self) -> PayloadBuilderHandle<T> {
285 PayloadBuilderHandle::new(self.service_tx.clone())
286 }
287
288 pub fn payload_events_handle(&self) -> broadcast::Sender<Events<T>> {
291 self.payload_events.clone()
292 }
293
294 fn contains_payload(&self, id: PayloadId) -> bool {
296 self.payload_jobs.iter().any(|(_, job_id, _)| *job_id == id)
297 }
298
299 fn best_payload(&self, id: PayloadId) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
301 let res = self
302 .payload_jobs
303 .iter()
304 .find(|(_, job_id, _)| *job_id == id)
305 .map(|(j, _, _)| j.best_payload().map(|p| p.into()));
306 if let Some(Ok(ref best)) = res {
307 self.metrics.set_best_revenue(best.block().number(), f64::from(best.fees()));
308 }
309
310 res
311 }
312
313 fn resolve(
318 &mut self,
319 id: PayloadId,
320 kind: PayloadKind,
321 ) -> ResolvePayloadResult<T::BuiltPayload, Gen::Job> {
322 let start = Instant::now();
323 debug!(target: "payload_builder", %id, "resolving payload job");
324
325 if let Some((cached, _, payload)) = &*self.cached_payload_rx.borrow() &&
326 *cached == id
327 {
328 self.metrics.resolve_duration_seconds.record(start.elapsed());
329 return (Some(Box::pin(core::future::ready(Ok(payload.clone())))), None);
330 }
331
332 let Some(job) = self.payload_jobs.iter().position(|(_, job_id, _)| *job_id == id) else {
333 return (None, None)
334 };
335 let (fut, keep_alive) = self.payload_jobs[job].0.resolve_kind(kind);
336 let payload_timestamp = self.payload_jobs[job].0.payload_timestamp();
337
338 let resolved_job =
339 (keep_alive == KeepPayloadJobAlive::No).then(|| self.payload_jobs.swap_remove(job));
340
341 let resolved_metrics = self.metrics.clone();
344 let payload_events = self.payload_events.clone();
345 let cached_payload_tx = self.cached_payload_tx.clone();
346
347 let fut = async move {
348 let res = fut.await;
349 resolved_metrics.resolve_duration_seconds.record(start.elapsed());
350 if let Ok(payload) = &res {
351 if payload_events.receiver_count() > 0 {
352 payload_events.send(Events::BuiltPayload(payload.clone().into())).ok();
353 }
354
355 if let Ok(timestamp) = payload_timestamp {
356 let _ = cached_payload_tx.send(Some((id, timestamp, payload.clone().into())));
357 }
358
359 resolved_metrics
360 .set_resolved_revenue(payload.block().number(), f64::from(payload.fees()));
361 }
362 res.map(|p| p.into())
363 };
364
365 (Some(Box::pin(fut)), resolved_job)
366 }
367
368 fn payload_timestamp(&self, id: PayloadId) -> Option<Result<u64, PayloadBuilderError>> {
370 if let Some((cached_id, timestamp, _)) = *self.cached_payload_rx.borrow() &&
371 cached_id == id
372 {
373 return Some(Ok(timestamp));
374 }
375
376 let timestamp = self
377 .payload_jobs
378 .iter()
379 .find(|(_, job_id, _)| *job_id == id)
380 .map(|(j, _, _)| j.payload_timestamp());
381
382 if timestamp.is_none() {
383 trace!(target: "payload_builder", %id, "no matching payload job found to get timestamp for");
384 }
385
386 timestamp
387 }
388}
389
390impl<Gen, St, T, N> Future for PayloadBuilderService<Gen, St, T>
391where
392 T: PayloadTypes,
393 N: NodePrimitives,
394 Gen: PayloadJobGenerator + Unpin + 'static,
395 <Gen as PayloadJobGenerator>::Job: Unpin + 'static,
396 St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
397 Gen::Job: PayloadJob<PayloadAttributes = T::PayloadAttributes>,
398 <Gen::Job as PayloadJob>::BuiltPayload: Into<T::BuiltPayload>,
399{
400 type Output = ();
401
402 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
403 let this = self.get_mut();
404 loop {
405 while let Poll::Ready(Some(new_head)) = this.chain_events.poll_next_unpin(cx) {
407 this.generator.on_new_state(new_head);
408 }
409
410 for idx in (0..this.payload_jobs.len()).rev() {
414 let (mut job, id, job_span) = this.payload_jobs.swap_remove(idx);
415
416 let poll_result = {
417 let _entered = job_span.enter();
418 job.poll_unpin(cx)
419 };
420
421 match poll_result {
422 Poll::Ready(Ok(_)) => {
423 this.metrics.set_active_jobs(this.payload_jobs.len());
424 trace!(target: "payload_builder", %id, "payload job finished");
425 }
426 Poll::Ready(Err(err)) => {
427 warn!(target: "payload_builder",%err, ?id, "Payload builder job failed; resolving payload");
428 this.metrics.inc_failed_jobs();
429 this.metrics.set_active_jobs(this.payload_jobs.len());
430 }
431 Poll::Pending => {
432 this.payload_jobs.push((job, id, job_span));
433 }
434 }
435 }
436
437 let mut new_job = false;
439
440 while let Poll::Ready(Some(cmd)) = this.command_rx.poll_next_unpin(cx) {
442 match cmd {
443 PayloadServiceCommand::BuildNewPayload(input, job_span, tx) => {
444 let id = input.payload_id();
445 let mut res = Ok(id);
446 let parent = input.parent_hash;
447
448 if this.contains_payload(id) {
449 debug!(target: "payload_builder", %id, %parent, "Payload job already in progress, ignoring.");
450 } else {
451 let start = Instant::now();
452 let attributes = input.attributes.clone();
453 let job_result = {
454 let _entered = job_span.enter();
455 this.generator.new_payload_job(*input, id)
456 };
457
458 match job_result {
459 Ok(job) => {
460 this.metrics.new_job_duration_seconds.record(start.elapsed());
461 info!(target: "payload_builder", %id, %parent, "New payload job created");
462 this.metrics.inc_initiated_jobs();
463 new_job = true;
464 this.payload_jobs.push((job, id, job_span));
465 this.payload_events.send(Events::Attributes(attributes)).ok();
466
467 if this
471 .cached_payload_rx
472 .borrow()
473 .as_ref()
474 .is_some_and(|(cached_id, _, _)| *cached_id == id)
475 {
476 trace!(target: "payload_builder", %id, "clearing stale cached payload for reused payload id");
477 let _ = this.cached_payload_tx.send(None);
478 }
479 }
480 Err(err) => {
481 this.metrics.new_job_duration_seconds.record(start.elapsed());
482 this.metrics.inc_failed_jobs();
483 warn!(target: "payload_builder", %err, %id, "Failed to create payload builder job");
484 res = Err(err);
485 }
486 }
487 }
488
489 let _ = tx.send(res);
490 }
491 PayloadServiceCommand::BestPayload(id, tx) => {
492 let _ = tx.send(this.best_payload(id));
493 }
494 PayloadServiceCommand::PayloadTimestamp(id, tx) => {
495 let timestamp = this.payload_timestamp(id);
496 let _ = tx.send(timestamp);
497 }
498 PayloadServiceCommand::Resolve(id, strategy, tx) => {
499 let (payload_fut, resolved_job) = this.resolve(id, strategy);
500 let _ = tx.send(payload_fut);
501
502 if let Some((_job, id, _job_span)) = resolved_job {
503 debug!(target: "payload_builder", %id, "terminated resolved job");
504 }
505 }
506 PayloadServiceCommand::Subscribe(tx) => {
507 let new_rx = this.payload_events.subscribe();
508 let _ = tx.send(new_rx);
509 }
510 }
511 }
512
513 if !new_job {
514 return Poll::Pending
515 }
516 }
517 }
518}
519
520#[derive(derive_more::Debug)]
522pub enum PayloadServiceCommand<T: PayloadTypes> {
523 BuildNewPayload(
528 Box<BuildNewPayload<T::PayloadAttributes>>,
529 Span,
530 oneshot::Sender<Result<PayloadId, PayloadBuilderError>>,
531 ),
532 BestPayload(PayloadId, oneshot::Sender<Option<Result<T::BuiltPayload, PayloadBuilderError>>>),
534 PayloadTimestamp(PayloadId, oneshot::Sender<Option<Result<u64, PayloadBuilderError>>>),
536 Resolve(
538 PayloadId,
539 PayloadKind,
540 #[debug(skip)] oneshot::Sender<Option<PayloadFuture<T::BuiltPayload>>>,
541 ),
542 Subscribe(oneshot::Sender<broadcast::Receiver<Events<T>>>),
544}
545
546#[derive(Debug)]
548pub struct BuildNewPayload<T> {
549 pub attributes: T,
551 pub parent_hash: B256,
553 pub cache: Option<SavedCache>,
557 pub trie_handle: Option<StateRootHandle>,
559}
560
561impl<T: PayloadAttributes> BuildNewPayload<T> {
562 pub fn payload_id(&self) -> PayloadId {
564 self.attributes.payload_id(&self.parent_hash)
565 }
566}