1use crate::{
7 metrics::PayloadBuilderServiceMetrics, traits::PayloadJobGenerator, KeepPayloadJobAlive,
8 PayloadJob,
9};
10use alloy_consensus::BlockHeader;
11use alloy_rpc_types::engine::PayloadId;
12use futures_util::{future::FutureExt, Stream, StreamExt};
13use reth_chain_state::CanonStateNotification;
14use reth_payload_builder_primitives::{Events, PayloadBuilderError, PayloadEvents};
15use reth_payload_primitives::{BuiltPayload, PayloadBuilderAttributes, PayloadKind, PayloadTypes};
16use reth_primitives_traits::NodePrimitives;
17use std::{
18 fmt,
19 future::Future,
20 pin::Pin,
21 sync::Arc,
22 task::{Context, Poll},
23};
24use tokio::sync::{
25 broadcast, mpsc,
26 oneshot::{self, Receiver},
27};
28use tokio_stream::wrappers::UnboundedReceiverStream;
29use tracing::{debug, info, trace, warn};
30
31type PayloadFuture<P> = Pin<Box<dyn Future<Output = Result<P, PayloadBuilderError>> + Send + Sync>>;
32
33#[derive(Debug)]
38pub struct PayloadStore<T: PayloadTypes> {
39 inner: Arc<PayloadBuilderHandle<T>>,
40}
41
42impl<T> PayloadStore<T>
43where
44 T: PayloadTypes,
45{
46 pub async fn resolve_kind(
51 &self,
52 id: PayloadId,
53 kind: PayloadKind,
54 ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
55 self.inner.resolve_kind(id, kind).await
56 }
57
58 pub async fn resolve(
60 &self,
61 id: PayloadId,
62 ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
63 self.resolve_kind(id, PayloadKind::Earliest).await
64 }
65
66 pub async fn best_payload(
70 &self,
71 id: PayloadId,
72 ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
73 self.inner.best_payload(id).await
74 }
75
76 pub async fn payload_attributes(
80 &self,
81 id: PayloadId,
82 ) -> Option<Result<T::PayloadBuilderAttributes, PayloadBuilderError>> {
83 self.inner.payload_attributes(id).await
84 }
85}
86
87impl<T> PayloadStore<T>
88where
89 T: PayloadTypes,
90{
91 pub fn new(inner: PayloadBuilderHandle<T>) -> Self {
93 Self { inner: Arc::new(inner) }
94 }
95}
96
97impl<T> From<PayloadBuilderHandle<T>> for PayloadStore<T>
98where
99 T: PayloadTypes,
100{
101 fn from(inner: PayloadBuilderHandle<T>) -> Self {
102 Self::new(inner)
103 }
104}
105
106#[derive(Debug)]
110pub struct PayloadBuilderHandle<T: PayloadTypes> {
111 to_service: mpsc::UnboundedSender<PayloadServiceCommand<T>>,
113}
114
115impl<T: PayloadTypes> PayloadBuilderHandle<T> {
116 pub const fn new(to_service: mpsc::UnboundedSender<PayloadServiceCommand<T>>) -> Self {
121 Self { to_service }
122 }
123
124 pub fn send_new_payload(
128 &self,
129 attr: T::PayloadBuilderAttributes,
130 ) -> Receiver<Result<PayloadId, PayloadBuilderError>> {
131 let (tx, rx) = oneshot::channel();
132 let _ = self.to_service.send(PayloadServiceCommand::BuildNewPayload(attr, tx));
133 rx
134 }
135
136 pub async fn best_payload(
139 &self,
140 id: PayloadId,
141 ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
142 let (tx, rx) = oneshot::channel();
143 self.to_service.send(PayloadServiceCommand::BestPayload(id, tx)).ok()?;
144 rx.await.ok()?
145 }
146
147 pub async fn resolve_kind(
149 &self,
150 id: PayloadId,
151 kind: PayloadKind,
152 ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
153 let (tx, rx) = oneshot::channel();
154 self.to_service.send(PayloadServiceCommand::Resolve(id, kind, tx)).ok()?;
155 match rx.await.transpose()? {
156 Ok(fut) => Some(fut.await),
157 Err(e) => Some(Err(e.into())),
158 }
159 }
160
161 pub async fn subscribe(&self) -> Result<PayloadEvents<T>, PayloadBuilderError> {
164 let (tx, rx) = oneshot::channel();
165 let _ = self.to_service.send(PayloadServiceCommand::Subscribe(tx));
166 Ok(PayloadEvents { receiver: rx.await? })
167 }
168
169 pub async fn payload_attributes(
173 &self,
174 id: PayloadId,
175 ) -> Option<Result<T::PayloadBuilderAttributes, PayloadBuilderError>> {
176 let (tx, rx) = oneshot::channel();
177 self.to_service.send(PayloadServiceCommand::PayloadAttributes(id, tx)).ok()?;
178 rx.await.ok()?
179 }
180}
181
182impl<T> Clone for PayloadBuilderHandle<T>
183where
184 T: PayloadTypes,
185{
186 fn clone(&self) -> Self {
187 Self { to_service: self.to_service.clone() }
188 }
189}
190
191#[derive(Debug)]
200#[must_use = "futures do nothing unless you `.await` or poll them"]
201pub struct PayloadBuilderService<Gen, St, T>
202where
203 T: PayloadTypes,
204 Gen: PayloadJobGenerator,
205 Gen::Job: PayloadJob<PayloadAttributes = T::PayloadBuilderAttributes>,
206{
207 generator: Gen,
209 payload_jobs: Vec<(Gen::Job, PayloadId)>,
211 service_tx: mpsc::UnboundedSender<PayloadServiceCommand<T>>,
213 command_rx: UnboundedReceiverStream<PayloadServiceCommand<T>>,
215 metrics: PayloadBuilderServiceMetrics,
217 chain_events: St,
219 payload_events: broadcast::Sender<Events<T>>,
221}
222
223const PAYLOAD_EVENTS_BUFFER_SIZE: usize = 20;
224
225impl<Gen, St, T> PayloadBuilderService<Gen, St, T>
228where
229 T: PayloadTypes,
230 Gen: PayloadJobGenerator,
231 Gen::Job: PayloadJob<PayloadAttributes = T::PayloadBuilderAttributes>,
232 <Gen::Job as PayloadJob>::BuiltPayload: Into<T::BuiltPayload>,
233{
234 pub fn new(generator: Gen, chain_events: St) -> (Self, PayloadBuilderHandle<T>) {
241 let (service_tx, command_rx) = mpsc::unbounded_channel();
242 let (payload_events, _) = broadcast::channel(PAYLOAD_EVENTS_BUFFER_SIZE);
243
244 let service = Self {
245 generator,
246 payload_jobs: Vec::new(),
247 service_tx,
248 command_rx: UnboundedReceiverStream::new(command_rx),
249 metrics: Default::default(),
250 chain_events,
251 payload_events,
252 };
253
254 let handle = service.handle();
255 (service, handle)
256 }
257
258 pub fn handle(&self) -> PayloadBuilderHandle<T> {
260 PayloadBuilderHandle::new(self.service_tx.clone())
261 }
262
263 fn contains_payload(&self, id: PayloadId) -> bool {
265 self.payload_jobs.iter().any(|(_, job_id)| *job_id == id)
266 }
267
268 fn best_payload(&self, id: PayloadId) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
270 let res = self
271 .payload_jobs
272 .iter()
273 .find(|(_, job_id)| *job_id == id)
274 .map(|(j, _)| j.best_payload().map(|p| p.into()));
275 if let Some(Ok(ref best)) = res {
276 self.metrics.set_best_revenue(best.block().number(), f64::from(best.fees()));
277 }
278
279 res
280 }
281
282 fn resolve(
285 &mut self,
286 id: PayloadId,
287 kind: PayloadKind,
288 ) -> Option<PayloadFuture<T::BuiltPayload>> {
289 trace!(%id, "resolving payload job");
290
291 let job = self.payload_jobs.iter().position(|(_, job_id)| *job_id == id)?;
292 let (fut, keep_alive) = self.payload_jobs[job].0.resolve_kind(kind);
293
294 if keep_alive == KeepPayloadJobAlive::No {
295 let (_, id) = self.payload_jobs.swap_remove(job);
296 trace!(%id, "terminated resolved job");
297 }
298
299 let resolved_metrics = self.metrics.clone();
302 let payload_events = self.payload_events.clone();
303
304 let fut = async move {
305 let res = fut.await;
306 if let Ok(ref payload) = res {
307 payload_events.send(Events::BuiltPayload(payload.clone().into())).ok();
308
309 resolved_metrics
310 .set_resolved_revenue(payload.block().number(), f64::from(payload.fees()));
311 }
312 res.map(|p| p.into())
313 };
314
315 Some(Box::pin(fut))
316 }
317}
318
319impl<Gen, St, T> PayloadBuilderService<Gen, St, T>
320where
321 T: PayloadTypes,
322 Gen: PayloadJobGenerator,
323 Gen::Job: PayloadJob<PayloadAttributes = T::PayloadBuilderAttributes>,
324 <Gen::Job as PayloadJob>::BuiltPayload: Into<T::BuiltPayload>,
325{
326 fn payload_attributes(
328 &self,
329 id: PayloadId,
330 ) -> Option<Result<<Gen::Job as PayloadJob>::PayloadAttributes, PayloadBuilderError>> {
331 let attributes = self
332 .payload_jobs
333 .iter()
334 .find(|(_, job_id)| *job_id == id)
335 .map(|(j, _)| j.payload_attributes());
336
337 if attributes.is_none() {
338 trace!(%id, "no matching payload job found to get attributes for");
339 }
340
341 attributes
342 }
343}
344
345impl<Gen, St, T, N> Future for PayloadBuilderService<Gen, St, T>
346where
347 T: PayloadTypes,
348 N: NodePrimitives,
349 Gen: PayloadJobGenerator + Unpin + 'static,
350 <Gen as PayloadJobGenerator>::Job: Unpin + 'static,
351 St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
352 Gen::Job: PayloadJob<PayloadAttributes = T::PayloadBuilderAttributes>,
353 <Gen::Job as PayloadJob>::BuiltPayload: Into<T::BuiltPayload>,
354{
355 type Output = ();
356
357 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
358 let this = self.get_mut();
359 loop {
360 while let Poll::Ready(Some(new_head)) = this.chain_events.poll_next_unpin(cx) {
362 this.generator.on_new_state(new_head);
363 }
364
365 for idx in (0..this.payload_jobs.len()).rev() {
369 let (mut job, id) = this.payload_jobs.swap_remove(idx);
370
371 match job.poll_unpin(cx) {
373 Poll::Ready(Ok(_)) => {
374 this.metrics.set_active_jobs(this.payload_jobs.len());
375 trace!(%id, "payload job finished");
376 }
377 Poll::Ready(Err(err)) => {
378 warn!(%err, ?id, "Payload builder job failed; resolving payload");
379 this.metrics.inc_failed_jobs();
380 this.metrics.set_active_jobs(this.payload_jobs.len());
381 }
382 Poll::Pending => {
383 this.payload_jobs.push((job, id));
385 }
386 }
387 }
388
389 let mut new_job = false;
391
392 while let Poll::Ready(Some(cmd)) = this.command_rx.poll_next_unpin(cx) {
394 match cmd {
395 PayloadServiceCommand::BuildNewPayload(attr, tx) => {
396 let id = attr.payload_id();
397 let mut res = Ok(id);
398
399 if this.contains_payload(id) {
400 debug!(%id, parent = %attr.parent(), "Payload job already in progress, ignoring.");
401 } else {
402 let parent = attr.parent();
404 match this.generator.new_payload_job(attr.clone()) {
405 Ok(job) => {
406 info!(%id, %parent, "New payload job created");
407 this.metrics.inc_initiated_jobs();
408 new_job = true;
409 this.payload_jobs.push((job, id));
410 this.payload_events.send(Events::Attributes(attr.clone())).ok();
411 }
412 Err(err) => {
413 this.metrics.inc_failed_jobs();
414 warn!(%err, %id, "Failed to create payload builder job");
415 res = Err(err);
416 }
417 }
418 }
419
420 let _ = tx.send(res);
422 }
423 PayloadServiceCommand::BestPayload(id, tx) => {
424 let _ = tx.send(this.best_payload(id));
425 }
426 PayloadServiceCommand::PayloadAttributes(id, tx) => {
427 let attributes = this.payload_attributes(id);
428 let _ = tx.send(attributes);
429 }
430 PayloadServiceCommand::Resolve(id, strategy, tx) => {
431 let _ = tx.send(this.resolve(id, strategy));
432 }
433 PayloadServiceCommand::Subscribe(tx) => {
434 let new_rx = this.payload_events.subscribe();
435 let _ = tx.send(new_rx);
436 }
437 }
438 }
439
440 if !new_job {
441 return Poll::Pending
442 }
443 }
444 }
445}
446
447pub enum PayloadServiceCommand<T: PayloadTypes> {
449 BuildNewPayload(
451 T::PayloadBuilderAttributes,
452 oneshot::Sender<Result<PayloadId, PayloadBuilderError>>,
453 ),
454 BestPayload(PayloadId, oneshot::Sender<Option<Result<T::BuiltPayload, PayloadBuilderError>>>),
456 PayloadAttributes(
458 PayloadId,
459 oneshot::Sender<Option<Result<T::PayloadBuilderAttributes, PayloadBuilderError>>>,
460 ),
461 Resolve(
463 PayloadId,
464 PayloadKind,
465 oneshot::Sender<Option<PayloadFuture<T::BuiltPayload>>>,
466 ),
467 Subscribe(oneshot::Sender<broadcast::Receiver<Events<T>>>),
469}
470
471impl<T> fmt::Debug for PayloadServiceCommand<T>
472where
473 T: PayloadTypes,
474{
475 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
476 match self {
477 Self::BuildNewPayload(f0, f1) => {
478 f.debug_tuple("BuildNewPayload").field(&f0).field(&f1).finish()
479 }
480 Self::BestPayload(f0, f1) => {
481 f.debug_tuple("BestPayload").field(&f0).field(&f1).finish()
482 }
483 Self::PayloadAttributes(f0, f1) => {
484 f.debug_tuple("PayloadAttributes").field(&f0).field(&f1).finish()
485 }
486 Self::Resolve(f0, f1, _f2) => f.debug_tuple("Resolve").field(&f0).field(&f1).finish(),
487 Self::Subscribe(f0) => f.debug_tuple("Subscribe").field(&f0).finish(),
488 }
489 }
490}