1use crate::{
7 metrics::PayloadBuilderServiceMetrics, traits::PayloadJobGenerator, KeepPayloadJobAlive,
8 PayloadJob,
9};
10use alloy_consensus::BlockHeader;
11use alloy_rpc_types::engine::PayloadId;
12use futures_util::{future::FutureExt, Stream, StreamExt};
13use reth_chain_state::CanonStateNotification;
14use reth_payload_builder_primitives::{Events, PayloadBuilderError, PayloadEvents};
15use reth_payload_primitives::{BuiltPayload, PayloadBuilderAttributes, PayloadKind, PayloadTypes};
16use reth_primitives_traits::NodePrimitives;
17use std::{
18 fmt,
19 future::Future,
20 pin::Pin,
21 sync::Arc,
22 task::{Context, Poll},
23};
24use tokio::sync::{
25 broadcast, mpsc,
26 oneshot::{self, Receiver},
27};
28use tokio_stream::wrappers::UnboundedReceiverStream;
29use tracing::{debug, info, trace, warn};
30
31type PayloadFuture<P> = Pin<Box<dyn Future<Output = Result<P, PayloadBuilderError>> + Send + Sync>>;
32
33#[derive(Debug)]
38pub struct PayloadStore<T: PayloadTypes> {
39 inner: Arc<PayloadBuilderHandle<T>>,
40}
41
42impl<T> PayloadStore<T>
43where
44 T: PayloadTypes,
45{
46 pub async fn resolve_kind(
51 &self,
52 id: PayloadId,
53 kind: PayloadKind,
54 ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
55 self.inner.resolve_kind(id, kind).await
56 }
57
58 pub async fn resolve(
60 &self,
61 id: PayloadId,
62 ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
63 self.resolve_kind(id, PayloadKind::Earliest).await
64 }
65
66 pub async fn best_payload(
70 &self,
71 id: PayloadId,
72 ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
73 self.inner.best_payload(id).await
74 }
75
76 pub async fn payload_timestamp(
80 &self,
81 id: PayloadId,
82 ) -> Option<Result<u64, PayloadBuilderError>> {
83 self.inner.payload_timestamp(id).await
84 }
85}
86
87impl<T> PayloadStore<T>
88where
89 T: PayloadTypes,
90{
91 pub fn new(inner: PayloadBuilderHandle<T>) -> Self {
93 Self { inner: Arc::new(inner) }
94 }
95}
96
97impl<T> From<PayloadBuilderHandle<T>> for PayloadStore<T>
98where
99 T: PayloadTypes,
100{
101 fn from(inner: PayloadBuilderHandle<T>) -> Self {
102 Self::new(inner)
103 }
104}
105
106#[derive(Debug)]
110pub struct PayloadBuilderHandle<T: PayloadTypes> {
111 to_service: mpsc::UnboundedSender<PayloadServiceCommand<T>>,
113}
114
115impl<T: PayloadTypes> PayloadBuilderHandle<T> {
116 pub const fn new(to_service: mpsc::UnboundedSender<PayloadServiceCommand<T>>) -> Self {
121 Self { to_service }
122 }
123
124 pub fn send_new_payload(
128 &self,
129 attr: T::PayloadBuilderAttributes,
130 ) -> Receiver<Result<PayloadId, PayloadBuilderError>> {
131 let (tx, rx) = oneshot::channel();
132 let _ = self.to_service.send(PayloadServiceCommand::BuildNewPayload(attr, tx));
133 rx
134 }
135
136 pub async fn best_payload(
139 &self,
140 id: PayloadId,
141 ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
142 let (tx, rx) = oneshot::channel();
143 self.to_service.send(PayloadServiceCommand::BestPayload(id, tx)).ok()?;
144 rx.await.ok()?
145 }
146
147 pub async fn resolve_kind(
149 &self,
150 id: PayloadId,
151 kind: PayloadKind,
152 ) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
153 let (tx, rx) = oneshot::channel();
154 self.to_service.send(PayloadServiceCommand::Resolve(id, kind, tx)).ok()?;
155 match rx.await.transpose()? {
156 Ok(fut) => Some(fut.await),
157 Err(e) => Some(Err(e.into())),
158 }
159 }
160
161 pub async fn subscribe(&self) -> Result<PayloadEvents<T>, PayloadBuilderError> {
164 let (tx, rx) = oneshot::channel();
165 let _ = self.to_service.send(PayloadServiceCommand::Subscribe(tx));
166 Ok(PayloadEvents { receiver: rx.await? })
167 }
168
169 pub async fn payload_timestamp(
173 &self,
174 id: PayloadId,
175 ) -> Option<Result<u64, PayloadBuilderError>> {
176 let (tx, rx) = oneshot::channel();
177 self.to_service.send(PayloadServiceCommand::PayloadTimestamp(id, tx)).ok()?;
178 rx.await.ok()?
179 }
180}
181
182impl<T> Clone for PayloadBuilderHandle<T>
183where
184 T: PayloadTypes,
185{
186 fn clone(&self) -> Self {
187 Self { to_service: self.to_service.clone() }
188 }
189}
190
191#[derive(Debug)]
200#[must_use = "futures do nothing unless you `.await` or poll them"]
201pub struct PayloadBuilderService<Gen, St, T>
202where
203 T: PayloadTypes,
204 Gen: PayloadJobGenerator,
205 Gen::Job: PayloadJob<PayloadAttributes = T::PayloadBuilderAttributes>,
206{
207 generator: Gen,
209 payload_jobs: Vec<(Gen::Job, PayloadId)>,
211 service_tx: mpsc::UnboundedSender<PayloadServiceCommand<T>>,
213 command_rx: UnboundedReceiverStream<PayloadServiceCommand<T>>,
215 metrics: PayloadBuilderServiceMetrics,
217 chain_events: St,
219 payload_events: broadcast::Sender<Events<T>>,
221}
222
223const PAYLOAD_EVENTS_BUFFER_SIZE: usize = 20;
224
225impl<Gen, St, T> PayloadBuilderService<Gen, St, T>
228where
229 T: PayloadTypes,
230 Gen: PayloadJobGenerator,
231 Gen::Job: PayloadJob<PayloadAttributes = T::PayloadBuilderAttributes>,
232 <Gen::Job as PayloadJob>::BuiltPayload: Into<T::BuiltPayload>,
233{
234 pub fn new(generator: Gen, chain_events: St) -> (Self, PayloadBuilderHandle<T>) {
241 let (service_tx, command_rx) = mpsc::unbounded_channel();
242 let (payload_events, _) = broadcast::channel(PAYLOAD_EVENTS_BUFFER_SIZE);
243
244 let service = Self {
245 generator,
246 payload_jobs: Vec::new(),
247 service_tx,
248 command_rx: UnboundedReceiverStream::new(command_rx),
249 metrics: Default::default(),
250 chain_events,
251 payload_events,
252 };
253
254 let handle = service.handle();
255 (service, handle)
256 }
257
258 pub fn handle(&self) -> PayloadBuilderHandle<T> {
260 PayloadBuilderHandle::new(self.service_tx.clone())
261 }
262
263 pub fn payload_events_handle(&self) -> broadcast::Sender<Events<T>> {
266 self.payload_events.clone()
267 }
268
269 fn contains_payload(&self, id: PayloadId) -> bool {
271 self.payload_jobs.iter().any(|(_, job_id)| *job_id == id)
272 }
273
274 fn best_payload(&self, id: PayloadId) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
276 let res = self
277 .payload_jobs
278 .iter()
279 .find(|(_, job_id)| *job_id == id)
280 .map(|(j, _)| j.best_payload().map(|p| p.into()));
281 if let Some(Ok(ref best)) = res {
282 self.metrics.set_best_revenue(best.block().number(), f64::from(best.fees()));
283 }
284
285 res
286 }
287
288 fn resolve(
291 &mut self,
292 id: PayloadId,
293 kind: PayloadKind,
294 ) -> Option<PayloadFuture<T::BuiltPayload>> {
295 debug!(target: "payload_builder", %id, "resolving payload job");
296
297 let job = self.payload_jobs.iter().position(|(_, job_id)| *job_id == id)?;
298 let (fut, keep_alive) = self.payload_jobs[job].0.resolve_kind(kind);
299
300 if keep_alive == KeepPayloadJobAlive::No {
301 let (_, id) = self.payload_jobs.swap_remove(job);
302 debug!(target: "payload_builder", %id, "terminated resolved job");
303 }
304
305 let resolved_metrics = self.metrics.clone();
308 let payload_events = self.payload_events.clone();
309
310 let fut = async move {
311 let res = fut.await;
312 if let Ok(payload) = &res {
313 if payload_events.receiver_count() > 0 {
314 payload_events.send(Events::BuiltPayload(payload.clone().into())).ok();
315 }
316
317 resolved_metrics
318 .set_resolved_revenue(payload.block().number(), f64::from(payload.fees()));
319 }
320 res.map(|p| p.into())
321 };
322
323 Some(Box::pin(fut))
324 }
325}
326
327impl<Gen, St, T> PayloadBuilderService<Gen, St, T>
328where
329 T: PayloadTypes,
330 Gen: PayloadJobGenerator,
331 Gen::Job: PayloadJob<PayloadAttributes = T::PayloadBuilderAttributes>,
332 <Gen::Job as PayloadJob>::BuiltPayload: Into<T::BuiltPayload>,
333{
334 fn payload_timestamp(&self, id: PayloadId) -> Option<Result<u64, PayloadBuilderError>> {
336 let timestamp = self
337 .payload_jobs
338 .iter()
339 .find(|(_, job_id)| *job_id == id)
340 .map(|(j, _)| j.payload_timestamp());
341
342 if timestamp.is_none() {
343 trace!(target: "payload_builder", %id, "no matching payload job found to get timestamp for");
344 }
345
346 timestamp
347 }
348}
349
350impl<Gen, St, T, N> Future for PayloadBuilderService<Gen, St, T>
351where
352 T: PayloadTypes,
353 N: NodePrimitives,
354 Gen: PayloadJobGenerator + Unpin + 'static,
355 <Gen as PayloadJobGenerator>::Job: Unpin + 'static,
356 St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
357 Gen::Job: PayloadJob<PayloadAttributes = T::PayloadBuilderAttributes>,
358 <Gen::Job as PayloadJob>::BuiltPayload: Into<T::BuiltPayload>,
359{
360 type Output = ();
361
362 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
363 let this = self.get_mut();
364 loop {
365 while let Poll::Ready(Some(new_head)) = this.chain_events.poll_next_unpin(cx) {
367 this.generator.on_new_state(new_head);
368 }
369
370 for idx in (0..this.payload_jobs.len()).rev() {
374 let (mut job, id) = this.payload_jobs.swap_remove(idx);
375
376 match job.poll_unpin(cx) {
378 Poll::Ready(Ok(_)) => {
379 this.metrics.set_active_jobs(this.payload_jobs.len());
380 trace!(target: "payload_builder", %id, "payload job finished");
381 }
382 Poll::Ready(Err(err)) => {
383 warn!(target: "payload_builder",%err, ?id, "Payload builder job failed; resolving payload");
384 this.metrics.inc_failed_jobs();
385 this.metrics.set_active_jobs(this.payload_jobs.len());
386 }
387 Poll::Pending => {
388 this.payload_jobs.push((job, id));
390 }
391 }
392 }
393
394 let mut new_job = false;
396
397 while let Poll::Ready(Some(cmd)) = this.command_rx.poll_next_unpin(cx) {
399 match cmd {
400 PayloadServiceCommand::BuildNewPayload(attr, tx) => {
401 let id = attr.payload_id();
402 let mut res = Ok(id);
403
404 if this.contains_payload(id) {
405 debug!(target: "payload_builder",%id, parent = %attr.parent(), "Payload job already in progress, ignoring.");
406 } else {
407 let parent = attr.parent();
409 match this.generator.new_payload_job(attr.clone()) {
410 Ok(job) => {
411 info!(target: "payload_builder", %id, %parent, "New payload job created");
412 this.metrics.inc_initiated_jobs();
413 new_job = true;
414 this.payload_jobs.push((job, id));
415 this.payload_events.send(Events::Attributes(attr.clone())).ok();
416 }
417 Err(err) => {
418 this.metrics.inc_failed_jobs();
419 warn!(target: "payload_builder", %err, %id, "Failed to create payload builder job");
420 res = Err(err);
421 }
422 }
423 }
424
425 let _ = tx.send(res);
427 }
428 PayloadServiceCommand::BestPayload(id, tx) => {
429 let _ = tx.send(this.best_payload(id));
430 }
431 PayloadServiceCommand::PayloadTimestamp(id, tx) => {
432 let timestamp = this.payload_timestamp(id);
433 let _ = tx.send(timestamp);
434 }
435 PayloadServiceCommand::Resolve(id, strategy, tx) => {
436 let _ = tx.send(this.resolve(id, strategy));
437 }
438 PayloadServiceCommand::Subscribe(tx) => {
439 let new_rx = this.payload_events.subscribe();
440 let _ = tx.send(new_rx);
441 }
442 }
443 }
444
445 if !new_job {
446 return Poll::Pending
447 }
448 }
449 }
450}
451
452pub enum PayloadServiceCommand<T: PayloadTypes> {
454 BuildNewPayload(
456 T::PayloadBuilderAttributes,
457 oneshot::Sender<Result<PayloadId, PayloadBuilderError>>,
458 ),
459 BestPayload(PayloadId, oneshot::Sender<Option<Result<T::BuiltPayload, PayloadBuilderError>>>),
461 PayloadTimestamp(PayloadId, oneshot::Sender<Option<Result<u64, PayloadBuilderError>>>),
463 Resolve(
465 PayloadId,
466 PayloadKind,
467 oneshot::Sender<Option<PayloadFuture<T::BuiltPayload>>>,
468 ),
469 Subscribe(oneshot::Sender<broadcast::Receiver<Events<T>>>),
471}
472
473impl<T> fmt::Debug for PayloadServiceCommand<T>
474where
475 T: PayloadTypes,
476{
477 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
478 match self {
479 Self::BuildNewPayload(f0, f1) => {
480 f.debug_tuple("BuildNewPayload").field(&f0).field(&f1).finish()
481 }
482 Self::BestPayload(f0, f1) => {
483 f.debug_tuple("BestPayload").field(&f0).field(&f1).finish()
484 }
485 Self::PayloadTimestamp(f0, f1) => {
486 f.debug_tuple("PayloadAttributes").field(&f0).field(&f1).finish()
487 }
488 Self::Resolve(f0, f1, _f2) => f.debug_tuple("Resolve").field(&f0).field(&f1).finish(),
489 Self::Subscribe(f0) => f.debug_tuple("Subscribe").field(&f0).finish(),
490 }
491 }
492}