1use crate::{
7 metrics::PayloadBuilderServiceMetrics, traits::PayloadJobGenerator, KeepPayloadJobAlive,
8 PayloadJob,
9};
10use alloy_consensus::BlockHeader;
11use alloy_rpc_types::engine::PayloadId;
12use futures_util::{future::FutureExt, Stream, StreamExt};
13use reth_chain_state::CanonStateNotification;
14use reth_payload_builder_primitives::{Events, PayloadBuilderError, PayloadEvents};
15use reth_payload_primitives::{BuiltPayload, PayloadBuilderAttributes, PayloadKind, PayloadTypes};
16use reth_primitives_traits::NodePrimitives;
17use std::{
18 fmt,
19 future::Future,
20 pin::Pin,
21 sync::Arc,
22 task::{Context, Poll},
23};
24use tokio::sync::{
25 broadcast, mpsc,
26 oneshot::{self, Receiver},
27};
28use tokio_stream::wrappers::UnboundedReceiverStream;
29use tracing::{debug, info, trace, warn};
30
31type PayloadFuture<P> = Pin<Box<dyn Future<Output = Result<P, PayloadBuilderError>> + Send + Sync>>;
32
33#[derive(Debug)]
38pub struct PayloadStore<T: PayloadTypes> {
39 inner: Arc<PayloadBuilderHandle<T>>,
40}
41
42impl<T> PayloadStore<T>
43where
44 T: PayloadTypes,
45{
46 pub async fn resolve_kind(
51 &self,
52 id: PayloadId,
53 kind: PayloadKind,
54 ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
55 self.inner.resolve_kind(id, kind).await
56 }
57
58 pub async fn resolve(
60 &self,
61 id: PayloadId,
62 ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
63 self.resolve_kind(id, PayloadKind::Earliest).await
64 }
65
66 pub async fn best_payload(
70 &self,
71 id: PayloadId,
72 ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
73 self.inner.best_payload(id).await
74 }
75
76 pub async fn payload_attributes(
80 &self,
81 id: PayloadId,
82 ) -> Option<Result<T::PayloadBuilderAttributes, PayloadBuilderError>> {
83 self.inner.payload_attributes(id).await
84 }
85}
86
87impl<T> PayloadStore<T>
88where
89 T: PayloadTypes,
90{
91 pub fn new(inner: PayloadBuilderHandle<T>) -> Self {
93 Self { inner: Arc::new(inner) }
94 }
95}
96
97impl<T> From<PayloadBuilderHandle<T>> for PayloadStore<T>
98where
99 T: PayloadTypes,
100{
101 fn from(inner: PayloadBuilderHandle<T>) -> Self {
102 Self::new(inner)
103 }
104}
105
106#[derive(Debug)]
110pub struct PayloadBuilderHandle<T: PayloadTypes> {
111 to_service: mpsc::UnboundedSender<PayloadServiceCommand<T>>,
113}
114
115impl<T: PayloadTypes> PayloadBuilderHandle<T> {
116 pub const fn new(to_service: mpsc::UnboundedSender<PayloadServiceCommand<T>>) -> Self {
121 Self { to_service }
122 }
123
124 pub fn send_new_payload(
128 &self,
129 attr: T::PayloadBuilderAttributes,
130 ) -> Receiver<Result<PayloadId, PayloadBuilderError>> {
131 let (tx, rx) = oneshot::channel();
132 let _ = self.to_service.send(PayloadServiceCommand::BuildNewPayload(attr, tx));
133 rx
134 }
135
136 pub async fn best_payload(
139 &self,
140 id: PayloadId,
141 ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
142 let (tx, rx) = oneshot::channel();
143 self.to_service.send(PayloadServiceCommand::BestPayload(id, tx)).ok()?;
144 rx.await.ok()?
145 }
146
147 pub async fn resolve_kind(
149 &self,
150 id: PayloadId,
151 kind: PayloadKind,
152 ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
153 let (tx, rx) = oneshot::channel();
154 self.to_service.send(PayloadServiceCommand::Resolve(id, kind, tx)).ok()?;
155 match rx.await.transpose()? {
156 Ok(fut) => Some(fut.await),
157 Err(e) => Some(Err(e.into())),
158 }
159 }
160
161 pub async fn subscribe(&self) -> Result<PayloadEvents<T>, PayloadBuilderError> {
164 let (tx, rx) = oneshot::channel();
165 let _ = self.to_service.send(PayloadServiceCommand::Subscribe(tx));
166 Ok(PayloadEvents { receiver: rx.await? })
167 }
168
169 pub async fn payload_attributes(
173 &self,
174 id: PayloadId,
175 ) -> Option<Result<T::PayloadBuilderAttributes, PayloadBuilderError>> {
176 let (tx, rx) = oneshot::channel();
177 self.to_service.send(PayloadServiceCommand::PayloadAttributes(id, tx)).ok()?;
178 rx.await.ok()?
179 }
180}
181
182impl<T> Clone for PayloadBuilderHandle<T>
183where
184 T: PayloadTypes,
185{
186 fn clone(&self) -> Self {
187 Self { to_service: self.to_service.clone() }
188 }
189}
190
191#[derive(Debug)]
200#[must_use = "futures do nothing unless you `.await` or poll them"]
201pub struct PayloadBuilderService<Gen, St, T>
202where
203 T: PayloadTypes,
204 Gen: PayloadJobGenerator,
205 Gen::Job: PayloadJob<PayloadAttributes = T::PayloadBuilderAttributes>,
206{
207 generator: Gen,
209 payload_jobs: Vec<(Gen::Job, PayloadId)>,
211 service_tx: mpsc::UnboundedSender<PayloadServiceCommand<T>>,
213 command_rx: UnboundedReceiverStream<PayloadServiceCommand<T>>,
215 metrics: PayloadBuilderServiceMetrics,
217 chain_events: St,
219 payload_events: broadcast::Sender<Events<T>>,
221}
222
223const PAYLOAD_EVENTS_BUFFER_SIZE: usize = 20;
224
225impl<Gen, St, T> PayloadBuilderService<Gen, St, T>
228where
229 T: PayloadTypes,
230 Gen: PayloadJobGenerator,
231 Gen::Job: PayloadJob<PayloadAttributes = T::PayloadBuilderAttributes>,
232 <Gen::Job as PayloadJob>::BuiltPayload: Into<T::BuiltPayload>,
233{
234 pub fn new(generator: Gen, chain_events: St) -> (Self, PayloadBuilderHandle<T>) {
241 let (service_tx, command_rx) = mpsc::unbounded_channel();
242 let (payload_events, _) = broadcast::channel(PAYLOAD_EVENTS_BUFFER_SIZE);
243
244 let service = Self {
245 generator,
246 payload_jobs: Vec::new(),
247 service_tx,
248 command_rx: UnboundedReceiverStream::new(command_rx),
249 metrics: Default::default(),
250 chain_events,
251 payload_events,
252 };
253
254 let handle = service.handle();
255 (service, handle)
256 }
257
258 pub fn handle(&self) -> PayloadBuilderHandle<T> {
260 PayloadBuilderHandle::new(self.service_tx.clone())
261 }
262
263 fn contains_payload(&self, id: PayloadId) -> bool {
265 self.payload_jobs.iter().any(|(_, job_id)| *job_id == id)
266 }
267
268 fn best_payload(&self, id: PayloadId) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
270 let res = self
271 .payload_jobs
272 .iter()
273 .find(|(_, job_id)| *job_id == id)
274 .map(|(j, _)| j.best_payload().map(|p| p.into()));
275 if let Some(Ok(ref best)) = res {
276 self.metrics.set_best_revenue(best.block().number(), f64::from(best.fees()));
277 }
278
279 res
280 }
281
282 fn resolve(
285 &mut self,
286 id: PayloadId,
287 kind: PayloadKind,
288 ) -> Option<PayloadFuture<T::BuiltPayload>> {
289 debug!(target: "payload_builder", %id, "resolving payload job");
290
291 let job = self.payload_jobs.iter().position(|(_, job_id)| *job_id == id)?;
292 let (fut, keep_alive) = self.payload_jobs[job].0.resolve_kind(kind);
293
294 if keep_alive == KeepPayloadJobAlive::No {
295 let (_, id) = self.payload_jobs.swap_remove(job);
296 debug!(target: "payload_builder", %id, "terminated resolved job");
297 }
298
299 let resolved_metrics = self.metrics.clone();
302 let payload_events = self.payload_events.clone();
303
304 let fut = async move {
305 let res = fut.await;
306 if let Ok(payload) = &res {
307 if payload_events.receiver_count() > 0 {
308 payload_events.send(Events::BuiltPayload(payload.clone().into())).ok();
309 }
310
311 resolved_metrics
312 .set_resolved_revenue(payload.block().number(), f64::from(payload.fees()));
313 }
314 res.map(|p| p.into())
315 };
316
317 Some(Box::pin(fut))
318 }
319}
320
321impl<Gen, St, T> PayloadBuilderService<Gen, St, T>
322where
323 T: PayloadTypes,
324 Gen: PayloadJobGenerator,
325 Gen::Job: PayloadJob<PayloadAttributes = T::PayloadBuilderAttributes>,
326 <Gen::Job as PayloadJob>::BuiltPayload: Into<T::BuiltPayload>,
327{
328 fn payload_attributes(
330 &self,
331 id: PayloadId,
332 ) -> Option<Result<<Gen::Job as PayloadJob>::PayloadAttributes, PayloadBuilderError>> {
333 let attributes = self
334 .payload_jobs
335 .iter()
336 .find(|(_, job_id)| *job_id == id)
337 .map(|(j, _)| j.payload_attributes());
338
339 if attributes.is_none() {
340 trace!(%id, "no matching payload job found to get attributes for");
341 }
342
343 attributes
344 }
345}
346
347impl<Gen, St, T, N> Future for PayloadBuilderService<Gen, St, T>
348where
349 T: PayloadTypes,
350 N: NodePrimitives,
351 Gen: PayloadJobGenerator + Unpin + 'static,
352 <Gen as PayloadJobGenerator>::Job: Unpin + 'static,
353 St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
354 Gen::Job: PayloadJob<PayloadAttributes = T::PayloadBuilderAttributes>,
355 <Gen::Job as PayloadJob>::BuiltPayload: Into<T::BuiltPayload>,
356{
357 type Output = ();
358
359 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
360 let this = self.get_mut();
361 loop {
362 while let Poll::Ready(Some(new_head)) = this.chain_events.poll_next_unpin(cx) {
364 this.generator.on_new_state(new_head);
365 }
366
367 for idx in (0..this.payload_jobs.len()).rev() {
371 let (mut job, id) = this.payload_jobs.swap_remove(idx);
372
373 match job.poll_unpin(cx) {
375 Poll::Ready(Ok(_)) => {
376 this.metrics.set_active_jobs(this.payload_jobs.len());
377 trace!(%id, "payload job finished");
378 }
379 Poll::Ready(Err(err)) => {
380 warn!(%err, ?id, "Payload builder job failed; resolving payload");
381 this.metrics.inc_failed_jobs();
382 this.metrics.set_active_jobs(this.payload_jobs.len());
383 }
384 Poll::Pending => {
385 this.payload_jobs.push((job, id));
387 }
388 }
389 }
390
391 let mut new_job = false;
393
394 while let Poll::Ready(Some(cmd)) = this.command_rx.poll_next_unpin(cx) {
396 match cmd {
397 PayloadServiceCommand::BuildNewPayload(attr, tx) => {
398 let id = attr.payload_id();
399 let mut res = Ok(id);
400
401 if this.contains_payload(id) {
402 debug!(%id, parent = %attr.parent(), "Payload job already in progress, ignoring.");
403 } else {
404 let parent = attr.parent();
406 match this.generator.new_payload_job(attr.clone()) {
407 Ok(job) => {
408 info!(%id, %parent, "New payload job created");
409 this.metrics.inc_initiated_jobs();
410 new_job = true;
411 this.payload_jobs.push((job, id));
412 this.payload_events.send(Events::Attributes(attr.clone())).ok();
413 }
414 Err(err) => {
415 this.metrics.inc_failed_jobs();
416 warn!(%err, %id, "Failed to create payload builder job");
417 res = Err(err);
418 }
419 }
420 }
421
422 let _ = tx.send(res);
424 }
425 PayloadServiceCommand::BestPayload(id, tx) => {
426 let _ = tx.send(this.best_payload(id));
427 }
428 PayloadServiceCommand::PayloadAttributes(id, tx) => {
429 let attributes = this.payload_attributes(id);
430 let _ = tx.send(attributes);
431 }
432 PayloadServiceCommand::Resolve(id, strategy, tx) => {
433 let _ = tx.send(this.resolve(id, strategy));
434 }
435 PayloadServiceCommand::Subscribe(tx) => {
436 let new_rx = this.payload_events.subscribe();
437 let _ = tx.send(new_rx);
438 }
439 }
440 }
441
442 if !new_job {
443 return Poll::Pending
444 }
445 }
446 }
447}
448
449pub enum PayloadServiceCommand<T: PayloadTypes> {
451 BuildNewPayload(
453 T::PayloadBuilderAttributes,
454 oneshot::Sender<Result<PayloadId, PayloadBuilderError>>,
455 ),
456 BestPayload(PayloadId, oneshot::Sender<Option<Result<T::BuiltPayload, PayloadBuilderError>>>),
458 PayloadAttributes(
460 PayloadId,
461 oneshot::Sender<Option<Result<T::PayloadBuilderAttributes, PayloadBuilderError>>>,
462 ),
463 Resolve(
465 PayloadId,
466 PayloadKind,
467 oneshot::Sender<Option<PayloadFuture<T::BuiltPayload>>>,
468 ),
469 Subscribe(oneshot::Sender<broadcast::Receiver<Events<T>>>),
471}
472
473impl<T> fmt::Debug for PayloadServiceCommand<T>
474where
475 T: PayloadTypes,
476{
477 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
478 match self {
479 Self::BuildNewPayload(f0, f1) => {
480 f.debug_tuple("BuildNewPayload").field(&f0).field(&f1).finish()
481 }
482 Self::BestPayload(f0, f1) => {
483 f.debug_tuple("BestPayload").field(&f0).field(&f1).finish()
484 }
485 Self::PayloadAttributes(f0, f1) => {
486 f.debug_tuple("PayloadAttributes").field(&f0).field(&f1).finish()
487 }
488 Self::Resolve(f0, f1, _f2) => f.debug_tuple("Resolve").field(&f0).field(&f1).finish(),
489 Self::Subscribe(f0) => f.debug_tuple("Subscribe").field(&f0).finish(),
490 }
491 }
492}