use crate::{
metrics::PayloadBuilderServiceMetrics, traits::PayloadJobGenerator, KeepPayloadJobAlive,
PayloadJob,
};
use alloy_rpc_types::engine::PayloadId;
use futures_util::{future::FutureExt, Stream, StreamExt};
use reth_chain_state::CanonStateNotification;
use reth_payload_builder_primitives::{
Events, PayloadBuilder, PayloadBuilderError, PayloadEvents, PayloadStoreExt,
};
use reth_payload_primitives::{BuiltPayload, PayloadBuilderAttributes, PayloadKind, PayloadTypes};
use reth_primitives_traits::NodePrimitives;
use std::{
fmt,
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use tokio::sync::{
broadcast, mpsc,
oneshot::{self, Receiver},
};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{debug, info, trace, warn};
type PayloadFuture<P> = Pin<Box<dyn Future<Output = Result<P, PayloadBuilderError>> + Send + Sync>>;
#[derive(Debug)]
pub struct PayloadStore<T: PayloadTypes> {
inner: Arc<dyn PayloadStoreExt<T>>,
}
impl<T> PayloadStore<T>
where
T: PayloadTypes,
{
pub async fn resolve_kind(
&self,
id: PayloadId,
kind: PayloadKind,
) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
self.inner.resolve_kind(id, kind).await
}
pub async fn resolve(
&self,
id: PayloadId,
) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
self.resolve_kind(id, PayloadKind::Earliest).await
}
pub async fn best_payload(
&self,
id: PayloadId,
) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
self.inner.best_payload(id).await
}
pub async fn payload_attributes(
&self,
id: PayloadId,
) -> Option<Result<T::PayloadBuilderAttributes, PayloadBuilderError>> {
self.inner.payload_attributes(id).await
}
}
impl<T> PayloadStore<T>
where
T: PayloadTypes,
{
pub fn new<P>(inner: P) -> Self
where
P: PayloadStoreExt<T> + 'static,
{
Self { inner: Arc::new(inner) }
}
}
impl<T> From<PayloadBuilderHandle<T>> for PayloadStore<T>
where
T: PayloadTypes,
{
fn from(inner: PayloadBuilderHandle<T>) -> Self {
Self::new(inner)
}
}
#[derive(Debug)]
pub struct PayloadBuilderHandle<T: PayloadTypes> {
to_service: mpsc::UnboundedSender<PayloadServiceCommand<T>>,
}
#[async_trait::async_trait]
impl<T> PayloadBuilder for PayloadBuilderHandle<T>
where
T: PayloadTypes,
{
type PayloadType = T;
type Error = PayloadBuilderError;
fn send_new_payload(
&self,
attr: <Self::PayloadType as PayloadTypes>::PayloadBuilderAttributes,
) -> Receiver<Result<PayloadId, Self::Error>> {
let (tx, rx) = oneshot::channel();
let _ = self.to_service.send(PayloadServiceCommand::BuildNewPayload(attr, tx));
rx
}
async fn best_payload(
&self,
id: PayloadId,
) -> Option<Result<<Self::PayloadType as PayloadTypes>::BuiltPayload, Self::Error>> {
let (tx, rx) = oneshot::channel();
self.to_service.send(PayloadServiceCommand::BestPayload(id, tx)).ok()?;
rx.await.ok()?
}
async fn resolve_kind(
&self,
id: PayloadId,
kind: PayloadKind,
) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
let (tx, rx) = oneshot::channel();
self.to_service.send(PayloadServiceCommand::Resolve(id, kind, tx)).ok()?;
match rx.await.transpose()? {
Ok(fut) => Some(fut.await),
Err(e) => Some(Err(e.into())),
}
}
async fn subscribe(&self) -> Result<PayloadEvents<Self::PayloadType>, Self::Error> {
let (tx, rx) = oneshot::channel();
let _ = self.to_service.send(PayloadServiceCommand::Subscribe(tx));
Ok(PayloadEvents { receiver: rx.await? })
}
async fn payload_attributes(
&self,
id: PayloadId,
) -> Option<Result<T::PayloadBuilderAttributes, PayloadBuilderError>> {
let (tx, rx) = oneshot::channel();
self.to_service.send(PayloadServiceCommand::PayloadAttributes(id, tx)).ok()?;
rx.await.ok()?
}
}
impl<T> PayloadBuilderHandle<T>
where
T: PayloadTypes,
{
pub const fn new(to_service: mpsc::UnboundedSender<PayloadServiceCommand<T>>) -> Self {
Self { to_service }
}
}
impl<T> Clone for PayloadBuilderHandle<T>
where
T: PayloadTypes,
{
fn clone(&self) -> Self {
Self { to_service: self.to_service.clone() }
}
}
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct PayloadBuilderService<Gen, St, T>
where
T: PayloadTypes,
Gen: PayloadJobGenerator,
Gen::Job: PayloadJob<PayloadAttributes = T::PayloadBuilderAttributes>,
{
generator: Gen,
payload_jobs: Vec<(Gen::Job, PayloadId)>,
service_tx: mpsc::UnboundedSender<PayloadServiceCommand<T>>,
command_rx: UnboundedReceiverStream<PayloadServiceCommand<T>>,
metrics: PayloadBuilderServiceMetrics,
chain_events: St,
payload_events: broadcast::Sender<Events<T>>,
}
const PAYLOAD_EVENTS_BUFFER_SIZE: usize = 20;
impl<Gen, St, T> PayloadBuilderService<Gen, St, T>
where
T: PayloadTypes,
Gen: PayloadJobGenerator,
Gen::Job: PayloadJob<PayloadAttributes = T::PayloadBuilderAttributes>,
<Gen::Job as PayloadJob>::BuiltPayload: Into<T::BuiltPayload>,
{
pub fn new(generator: Gen, chain_events: St) -> (Self, PayloadBuilderHandle<T>) {
let (service_tx, command_rx) = mpsc::unbounded_channel();
let (payload_events, _) = broadcast::channel(PAYLOAD_EVENTS_BUFFER_SIZE);
let service = Self {
generator,
payload_jobs: Vec::new(),
service_tx,
command_rx: UnboundedReceiverStream::new(command_rx),
metrics: Default::default(),
chain_events,
payload_events,
};
let handle = service.handle();
(service, handle)
}
pub fn handle(&self) -> PayloadBuilderHandle<T> {
PayloadBuilderHandle::new(self.service_tx.clone())
}
fn contains_payload(&self, id: PayloadId) -> bool {
self.payload_jobs.iter().any(|(_, job_id)| *job_id == id)
}
fn best_payload(&self, id: PayloadId) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
let res = self
.payload_jobs
.iter()
.find(|(_, job_id)| *job_id == id)
.map(|(j, _)| j.best_payload().map(|p| p.into()));
if let Some(Ok(ref best)) = res {
self.metrics.set_best_revenue(best.block().number, f64::from(best.fees()));
}
res
}
fn resolve(
&mut self,
id: PayloadId,
kind: PayloadKind,
) -> Option<PayloadFuture<T::BuiltPayload>> {
trace!(%id, "resolving payload job");
let job = self.payload_jobs.iter().position(|(_, job_id)| *job_id == id)?;
let (fut, keep_alive) = self.payload_jobs[job].0.resolve_kind(kind);
if keep_alive == KeepPayloadJobAlive::No {
let (_, id) = self.payload_jobs.swap_remove(job);
trace!(%id, "terminated resolved job");
}
let resolved_metrics = self.metrics.clone();
let payload_events = self.payload_events.clone();
let fut = async move {
let res = fut.await;
if let Ok(ref payload) = res {
payload_events.send(Events::BuiltPayload(payload.clone().into())).ok();
resolved_metrics
.set_resolved_revenue(payload.block().number, f64::from(payload.fees()));
}
res.map(|p| p.into())
};
Some(Box::pin(fut))
}
}
impl<Gen, St, T> PayloadBuilderService<Gen, St, T>
where
T: PayloadTypes,
Gen: PayloadJobGenerator,
Gen::Job: PayloadJob<PayloadAttributes = T::PayloadBuilderAttributes>,
<Gen::Job as PayloadJob>::BuiltPayload: Into<T::BuiltPayload>,
{
fn payload_attributes(
&self,
id: PayloadId,
) -> Option<Result<<Gen::Job as PayloadJob>::PayloadAttributes, PayloadBuilderError>> {
let attributes = self
.payload_jobs
.iter()
.find(|(_, job_id)| *job_id == id)
.map(|(j, _)| j.payload_attributes());
if attributes.is_none() {
trace!(%id, "no matching payload job found to get attributes for");
}
attributes
}
}
impl<Gen, St, T, N> Future for PayloadBuilderService<Gen, St, T>
where
T: PayloadTypes,
N: NodePrimitives,
Gen: PayloadJobGenerator + Unpin + 'static,
<Gen as PayloadJobGenerator>::Job: Unpin + 'static,
St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
Gen::Job: PayloadJob<PayloadAttributes = T::PayloadBuilderAttributes>,
<Gen::Job as PayloadJob>::BuiltPayload: Into<T::BuiltPayload>,
{
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
loop {
while let Poll::Ready(Some(new_head)) = this.chain_events.poll_next_unpin(cx) {
this.generator.on_new_state(new_head);
}
for idx in (0..this.payload_jobs.len()).rev() {
let (mut job, id) = this.payload_jobs.swap_remove(idx);
match job.poll_unpin(cx) {
Poll::Ready(Ok(_)) => {
this.metrics.set_active_jobs(this.payload_jobs.len());
trace!(%id, "payload job finished");
}
Poll::Ready(Err(err)) => {
warn!(%err, ?id, "Payload builder job failed; resolving payload");
this.metrics.inc_failed_jobs();
this.metrics.set_active_jobs(this.payload_jobs.len());
}
Poll::Pending => {
this.payload_jobs.push((job, id));
}
}
}
let mut new_job = false;
while let Poll::Ready(Some(cmd)) = this.command_rx.poll_next_unpin(cx) {
match cmd {
PayloadServiceCommand::BuildNewPayload(attr, tx) => {
let id = attr.payload_id();
let mut res = Ok(id);
if this.contains_payload(id) {
debug!(%id, parent = %attr.parent(), "Payload job already in progress, ignoring.");
} else {
let parent = attr.parent();
match this.generator.new_payload_job(attr.clone()) {
Ok(job) => {
info!(%id, %parent, "New payload job created");
this.metrics.inc_initiated_jobs();
new_job = true;
this.payload_jobs.push((job, id));
this.payload_events.send(Events::Attributes(attr.clone())).ok();
}
Err(err) => {
this.metrics.inc_failed_jobs();
warn!(%err, %id, "Failed to create payload builder job");
res = Err(err);
}
}
}
let _ = tx.send(res);
}
PayloadServiceCommand::BestPayload(id, tx) => {
let _ = tx.send(this.best_payload(id));
}
PayloadServiceCommand::PayloadAttributes(id, tx) => {
let attributes = this.payload_attributes(id);
let _ = tx.send(attributes);
}
PayloadServiceCommand::Resolve(id, strategy, tx) => {
let _ = tx.send(this.resolve(id, strategy));
}
PayloadServiceCommand::Subscribe(tx) => {
let new_rx = this.payload_events.subscribe();
let _ = tx.send(new_rx);
}
}
}
if !new_job {
return Poll::Pending
}
}
}
}
pub enum PayloadServiceCommand<T: PayloadTypes> {
BuildNewPayload(
T::PayloadBuilderAttributes,
oneshot::Sender<Result<PayloadId, PayloadBuilderError>>,
),
BestPayload(PayloadId, oneshot::Sender<Option<Result<T::BuiltPayload, PayloadBuilderError>>>),
PayloadAttributes(
PayloadId,
oneshot::Sender<Option<Result<T::PayloadBuilderAttributes, PayloadBuilderError>>>,
),
Resolve(
PayloadId,
PayloadKind,
oneshot::Sender<Option<PayloadFuture<T::BuiltPayload>>>,
),
Subscribe(oneshot::Sender<broadcast::Receiver<Events<T>>>),
}
impl<T> fmt::Debug for PayloadServiceCommand<T>
where
T: PayloadTypes,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::BuildNewPayload(f0, f1) => {
f.debug_tuple("BuildNewPayload").field(&f0).field(&f1).finish()
}
Self::BestPayload(f0, f1) => {
f.debug_tuple("BestPayload").field(&f0).field(&f1).finish()
}
Self::PayloadAttributes(f0, f1) => {
f.debug_tuple("PayloadAttributes").field(&f0).field(&f1).finish()
}
Self::Resolve(f0, f1, _f2) => f.debug_tuple("Resolve").field(&f0).field(&f1).finish(),
Self::Subscribe(f0) => f.debug_tuple("Subscribe").field(&f0).finish(),
}
}
}