1use crate::{
7 metrics::PayloadBuilderServiceMetrics, traits::PayloadJobGenerator, KeepPayloadJobAlive,
8 PayloadJob,
9};
10use alloy_consensus::BlockHeader;
11use alloy_primitives::BlockTimestamp;
12use alloy_rpc_types::engine::PayloadId;
13use futures_util::{future::FutureExt, Stream, StreamExt};
14use reth_chain_state::CanonStateNotification;
15use reth_payload_builder_primitives::{Events, PayloadBuilderError, PayloadEvents};
16use reth_payload_primitives::{BuiltPayload, PayloadBuilderAttributes, PayloadKind, PayloadTypes};
17use reth_primitives_traits::NodePrimitives;
18use std::{
19 fmt,
20 future::Future,
21 pin::Pin,
22 sync::Arc,
23 task::{Context, Poll},
24};
25use tokio::sync::{
26 broadcast, mpsc,
27 oneshot::{self, Receiver},
28 watch,
29};
30use tokio_stream::wrappers::UnboundedReceiverStream;
31use tracing::{debug, info, trace, warn};
32
33type PayloadFuture<P> = Pin<Box<dyn Future<Output = Result<P, PayloadBuilderError>> + Send>>;
34
35#[derive(Debug)]
40pub struct PayloadStore<T: PayloadTypes> {
41 inner: Arc<PayloadBuilderHandle<T>>,
42}
43
44impl<T> PayloadStore<T>
45where
46 T: PayloadTypes,
47{
48 pub async fn resolve_kind(
53 &self,
54 id: PayloadId,
55 kind: PayloadKind,
56 ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
57 self.inner.resolve_kind(id, kind).await
58 }
59
60 pub async fn resolve(
62 &self,
63 id: PayloadId,
64 ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
65 self.resolve_kind(id, PayloadKind::Earliest).await
66 }
67
68 pub async fn best_payload(
72 &self,
73 id: PayloadId,
74 ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
75 self.inner.best_payload(id).await
76 }
77
78 pub async fn payload_timestamp(
82 &self,
83 id: PayloadId,
84 ) -> Option<Result<u64, PayloadBuilderError>> {
85 self.inner.payload_timestamp(id).await
86 }
87}
88
89impl<T> PayloadStore<T>
90where
91 T: PayloadTypes,
92{
93 pub fn new(inner: PayloadBuilderHandle<T>) -> Self {
95 Self { inner: Arc::new(inner) }
96 }
97}
98
99impl<T> From<PayloadBuilderHandle<T>> for PayloadStore<T>
100where
101 T: PayloadTypes,
102{
103 fn from(inner: PayloadBuilderHandle<T>) -> Self {
104 Self::new(inner)
105 }
106}
107
108#[derive(Debug)]
112pub struct PayloadBuilderHandle<T: PayloadTypes> {
113 to_service: mpsc::UnboundedSender<PayloadServiceCommand<T>>,
115}
116
117impl<T: PayloadTypes> PayloadBuilderHandle<T> {
118 pub const fn new(to_service: mpsc::UnboundedSender<PayloadServiceCommand<T>>) -> Self {
123 Self { to_service }
124 }
125
126 pub fn send_new_payload(
130 &self,
131 attr: T::PayloadBuilderAttributes,
132 ) -> Receiver<Result<PayloadId, PayloadBuilderError>> {
133 let (tx, rx) = oneshot::channel();
134 let _ = self.to_service.send(PayloadServiceCommand::BuildNewPayload(attr, tx));
135 rx
136 }
137
138 pub async fn best_payload(
141 &self,
142 id: PayloadId,
143 ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
144 let (tx, rx) = oneshot::channel();
145 self.to_service.send(PayloadServiceCommand::BestPayload(id, tx)).ok()?;
146 rx.await.ok()?
147 }
148
149 pub async fn resolve_kind(
151 &self,
152 id: PayloadId,
153 kind: PayloadKind,
154 ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
155 let (tx, rx) = oneshot::channel();
156 self.to_service.send(PayloadServiceCommand::Resolve(id, kind, tx)).ok()?;
157 match rx.await.transpose()? {
158 Ok(fut) => Some(fut.await),
159 Err(e) => Some(Err(e.into())),
160 }
161 }
162
163 pub async fn subscribe(&self) -> Result<PayloadEvents<T>, PayloadBuilderError> {
166 let (tx, rx) = oneshot::channel();
167 let _ = self.to_service.send(PayloadServiceCommand::Subscribe(tx));
168 Ok(PayloadEvents { receiver: rx.await? })
169 }
170
171 pub async fn payload_timestamp(
175 &self,
176 id: PayloadId,
177 ) -> Option<Result<u64, PayloadBuilderError>> {
178 let (tx, rx) = oneshot::channel();
179 self.to_service.send(PayloadServiceCommand::PayloadTimestamp(id, tx)).ok()?;
180 rx.await.ok()?
181 }
182}
183
184impl<T> Clone for PayloadBuilderHandle<T>
185where
186 T: PayloadTypes,
187{
188 fn clone(&self) -> Self {
189 Self { to_service: self.to_service.clone() }
190 }
191}
192
193#[derive(Debug)]
202#[must_use = "futures do nothing unless you `.await` or poll them"]
203pub struct PayloadBuilderService<Gen, St, T>
204where
205 T: PayloadTypes,
206 Gen: PayloadJobGenerator,
207 Gen::Job: PayloadJob<PayloadAttributes = T::PayloadBuilderAttributes>,
208{
209 generator: Gen,
211 payload_jobs: Vec<(Gen::Job, PayloadId)>,
213 service_tx: mpsc::UnboundedSender<PayloadServiceCommand<T>>,
215 command_rx: UnboundedReceiverStream<PayloadServiceCommand<T>>,
217 metrics: PayloadBuilderServiceMetrics,
219 chain_events: St,
221 payload_events: broadcast::Sender<Events<T>>,
223 cached_payload_rx: watch::Receiver<Option<(PayloadId, BlockTimestamp, T::BuiltPayload)>>,
226 cached_payload_tx: watch::Sender<Option<(PayloadId, BlockTimestamp, T::BuiltPayload)>>,
228}
229
230const PAYLOAD_EVENTS_BUFFER_SIZE: usize = 20;
231
232impl<Gen, St, T> PayloadBuilderService<Gen, St, T>
235where
236 T: PayloadTypes,
237 Gen: PayloadJobGenerator,
238 Gen::Job: PayloadJob<PayloadAttributes = T::PayloadBuilderAttributes>,
239 <Gen::Job as PayloadJob>::BuiltPayload: Into<T::BuiltPayload>,
240{
241 pub fn new(generator: Gen, chain_events: St) -> (Self, PayloadBuilderHandle<T>) {
248 let (service_tx, command_rx) = mpsc::unbounded_channel();
249 let (payload_events, _) = broadcast::channel(PAYLOAD_EVENTS_BUFFER_SIZE);
250
251 let (cached_payload_tx, cached_payload_rx) = watch::channel(None);
252
253 let service = Self {
254 generator,
255 payload_jobs: Vec::new(),
256 service_tx,
257 command_rx: UnboundedReceiverStream::new(command_rx),
258 metrics: Default::default(),
259 chain_events,
260 payload_events,
261 cached_payload_rx,
262 cached_payload_tx,
263 };
264
265 let handle = service.handle();
266 (service, handle)
267 }
268
269 pub fn handle(&self) -> PayloadBuilderHandle<T> {
271 PayloadBuilderHandle::new(self.service_tx.clone())
272 }
273
274 pub fn payload_events_handle(&self) -> broadcast::Sender<Events<T>> {
277 self.payload_events.clone()
278 }
279
280 fn contains_payload(&self, id: PayloadId) -> bool {
282 self.payload_jobs.iter().any(|(_, job_id)| *job_id == id)
283 }
284
285 fn best_payload(&self, id: PayloadId) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
287 let res = self
288 .payload_jobs
289 .iter()
290 .find(|(_, job_id)| *job_id == id)
291 .map(|(j, _)| j.best_payload().map(|p| p.into()));
292 if let Some(Ok(ref best)) = res {
293 self.metrics.set_best_revenue(best.block().number(), f64::from(best.fees()));
294 }
295
296 res
297 }
298
299 fn resolve(
302 &mut self,
303 id: PayloadId,
304 kind: PayloadKind,
305 ) -> Option<PayloadFuture<T::BuiltPayload>> {
306 debug!(target: "payload_builder", %id, "resolving payload job");
307
308 if let Some((cached, _, payload)) = &*self.cached_payload_rx.borrow() {
309 if *cached == id {
310 return Some(Box::pin(core::future::ready(Ok(payload.clone()))));
311 }
312 }
313
314 let job = self.payload_jobs.iter().position(|(_, job_id)| *job_id == id)?;
315 let (fut, keep_alive) = self.payload_jobs[job].0.resolve_kind(kind);
316 let payload_timestamp = self.payload_jobs[job].0.payload_timestamp();
317
318 if keep_alive == KeepPayloadJobAlive::No {
319 let (_, id) = self.payload_jobs.swap_remove(job);
320 debug!(target: "payload_builder", %id, "terminated resolved job");
321 }
322
323 let resolved_metrics = self.metrics.clone();
326 let payload_events = self.payload_events.clone();
327 let cached_payload_tx = self.cached_payload_tx.clone();
328
329 let fut = async move {
330 let res = fut.await;
331 if let Ok(payload) = &res {
332 if payload_events.receiver_count() > 0 {
333 payload_events.send(Events::BuiltPayload(payload.clone().into())).ok();
334 }
335
336 if let Ok(timestamp) = payload_timestamp {
337 let _ = cached_payload_tx.send(Some((id, timestamp, payload.clone().into())));
338 }
339
340 resolved_metrics
341 .set_resolved_revenue(payload.block().number(), f64::from(payload.fees()));
342 }
343 res.map(|p| p.into())
344 };
345
346 Some(Box::pin(fut))
347 }
348}
349
350impl<Gen, St, T> PayloadBuilderService<Gen, St, T>
351where
352 T: PayloadTypes,
353 Gen: PayloadJobGenerator,
354 Gen::Job: PayloadJob<PayloadAttributes = T::PayloadBuilderAttributes>,
355 <Gen::Job as PayloadJob>::BuiltPayload: Into<T::BuiltPayload>,
356{
357 fn payload_timestamp(&self, id: PayloadId) -> Option<Result<u64, PayloadBuilderError>> {
359 if let Some((cached_id, timestamp, _)) = *self.cached_payload_rx.borrow() {
360 if cached_id == id {
361 return Some(Ok(timestamp));
362 }
363 }
364
365 let timestamp = self
366 .payload_jobs
367 .iter()
368 .find(|(_, job_id)| *job_id == id)
369 .map(|(j, _)| j.payload_timestamp());
370
371 if timestamp.is_none() {
372 trace!(target: "payload_builder", %id, "no matching payload job found to get timestamp for");
373 }
374
375 timestamp
376 }
377}
378
379impl<Gen, St, T, N> Future for PayloadBuilderService<Gen, St, T>
380where
381 T: PayloadTypes,
382 N: NodePrimitives,
383 Gen: PayloadJobGenerator + Unpin + 'static,
384 <Gen as PayloadJobGenerator>::Job: Unpin + 'static,
385 St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
386 Gen::Job: PayloadJob<PayloadAttributes = T::PayloadBuilderAttributes>,
387 <Gen::Job as PayloadJob>::BuiltPayload: Into<T::BuiltPayload>,
388{
389 type Output = ();
390
391 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
392 let this = self.get_mut();
393 loop {
394 while let Poll::Ready(Some(new_head)) = this.chain_events.poll_next_unpin(cx) {
396 this.generator.on_new_state(new_head);
397 }
398
399 for idx in (0..this.payload_jobs.len()).rev() {
403 let (mut job, id) = this.payload_jobs.swap_remove(idx);
404
405 match job.poll_unpin(cx) {
407 Poll::Ready(Ok(_)) => {
408 this.metrics.set_active_jobs(this.payload_jobs.len());
409 trace!(target: "payload_builder", %id, "payload job finished");
410 }
411 Poll::Ready(Err(err)) => {
412 warn!(target: "payload_builder",%err, ?id, "Payload builder job failed; resolving payload");
413 this.metrics.inc_failed_jobs();
414 this.metrics.set_active_jobs(this.payload_jobs.len());
415 }
416 Poll::Pending => {
417 this.payload_jobs.push((job, id));
419 }
420 }
421 }
422
423 let mut new_job = false;
425
426 while let Poll::Ready(Some(cmd)) = this.command_rx.poll_next_unpin(cx) {
428 match cmd {
429 PayloadServiceCommand::BuildNewPayload(attr, tx) => {
430 let id = attr.payload_id();
431 let mut res = Ok(id);
432
433 if this.contains_payload(id) {
434 debug!(target: "payload_builder",%id, parent = %attr.parent(), "Payload job already in progress, ignoring.");
435 } else {
436 let parent = attr.parent();
438 match this.generator.new_payload_job(attr.clone()) {
439 Ok(job) => {
440 info!(target: "payload_builder", %id, %parent, "New payload job created");
441 this.metrics.inc_initiated_jobs();
442 new_job = true;
443 this.payload_jobs.push((job, id));
444 this.payload_events.send(Events::Attributes(attr.clone())).ok();
445 }
446 Err(err) => {
447 this.metrics.inc_failed_jobs();
448 warn!(target: "payload_builder", %err, %id, "Failed to create payload builder job");
449 res = Err(err);
450 }
451 }
452 }
453
454 let _ = tx.send(res);
456 }
457 PayloadServiceCommand::BestPayload(id, tx) => {
458 let _ = tx.send(this.best_payload(id));
459 }
460 PayloadServiceCommand::PayloadTimestamp(id, tx) => {
461 let timestamp = this.payload_timestamp(id);
462 let _ = tx.send(timestamp);
463 }
464 PayloadServiceCommand::Resolve(id, strategy, tx) => {
465 let _ = tx.send(this.resolve(id, strategy));
466 }
467 PayloadServiceCommand::Subscribe(tx) => {
468 let new_rx = this.payload_events.subscribe();
469 let _ = tx.send(new_rx);
470 }
471 }
472 }
473
474 if !new_job {
475 return Poll::Pending
476 }
477 }
478 }
479}
480
481pub enum PayloadServiceCommand<T: PayloadTypes> {
483 BuildNewPayload(
485 T::PayloadBuilderAttributes,
486 oneshot::Sender<Result<PayloadId, PayloadBuilderError>>,
487 ),
488 BestPayload(PayloadId, oneshot::Sender<Option<Result<T::BuiltPayload, PayloadBuilderError>>>),
490 PayloadTimestamp(PayloadId, oneshot::Sender<Option<Result<u64, PayloadBuilderError>>>),
492 Resolve(
494 PayloadId,
495 PayloadKind,
496 oneshot::Sender<Option<PayloadFuture<T::BuiltPayload>>>,
497 ),
498 Subscribe(oneshot::Sender<broadcast::Receiver<Events<T>>>),
500}
501
502impl<T> fmt::Debug for PayloadServiceCommand<T>
503where
504 T: PayloadTypes,
505{
506 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
507 match self {
508 Self::BuildNewPayload(f0, f1) => {
509 f.debug_tuple("BuildNewPayload").field(&f0).field(&f1).finish()
510 }
511 Self::BestPayload(f0, f1) => {
512 f.debug_tuple("BestPayload").field(&f0).field(&f1).finish()
513 }
514 Self::PayloadTimestamp(f0, f1) => {
515 f.debug_tuple("PayloadAttributes").field(&f0).field(&f1).finish()
516 }
517 Self::Resolve(f0, f1, _f2) => f.debug_tuple("Resolve").field(&f0).field(&f1).finish(),
518 Self::Subscribe(f0) => f.debug_tuple("Subscribe").field(&f0).finish(),
519 }
520 }
521}