reth_payload_builder_primitives/
events.rsuse reth_payload_primitives::PayloadTypes;
use std::{
pin::Pin,
task::{ready, Context, Poll},
};
use tokio::sync::broadcast;
use tokio_stream::{
wrappers::{errors::BroadcastStreamRecvError, BroadcastStream},
Stream, StreamExt,
};
use tracing::debug;
#[derive(Clone, Debug)]
pub enum Events<T: PayloadTypes> {
Attributes(T::PayloadBuilderAttributes),
BuiltPayload(T::BuiltPayload),
}
#[derive(Debug)]
pub struct PayloadEvents<T: PayloadTypes> {
pub receiver: broadcast::Receiver<Events<T>>,
}
impl<T: PayloadTypes> PayloadEvents<T> {
pub fn into_stream(self) -> BroadcastStream<Events<T>> {
BroadcastStream::new(self.receiver)
}
pub async fn recv(self) -> Option<Result<Events<T>, BroadcastStreamRecvError>> {
let mut event_stream = self.into_stream();
event_stream.next().await
}
pub fn into_built_payload_stream(self) -> BuiltPayloadStream<T> {
BuiltPayloadStream { st: self.into_stream() }
}
pub fn into_attributes_stream(self) -> PayloadAttributeStream<T> {
PayloadAttributeStream { st: self.into_stream() }
}
}
#[derive(Debug)]
#[pin_project::pin_project]
pub struct BuiltPayloadStream<T: PayloadTypes> {
#[pin]
st: BroadcastStream<Events<T>>,
}
impl<T: PayloadTypes> Stream for BuiltPayloadStream<T> {
type Item = T::BuiltPayload;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
return match ready!(self.as_mut().project().st.poll_next(cx)) {
Some(Ok(Events::BuiltPayload(payload))) => Poll::Ready(Some(payload)),
Some(Ok(Events::Attributes(_))) => {
continue
}
Some(Err(err)) => {
debug!(%err, "payload event stream stream lagging behind");
continue
}
None => Poll::Ready(None),
}
}
}
}
#[derive(Debug)]
#[pin_project::pin_project]
pub struct PayloadAttributeStream<T: PayloadTypes> {
#[pin]
st: BroadcastStream<Events<T>>,
}
impl<T: PayloadTypes> Stream for PayloadAttributeStream<T> {
type Item = T::PayloadBuilderAttributes;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
return match ready!(self.as_mut().project().st.poll_next(cx)) {
Some(Ok(Events::Attributes(attr))) => Poll::Ready(Some(attr)),
Some(Ok(Events::BuiltPayload(_))) => {
continue
}
Some(Err(err)) => {
debug!(%err, "payload event stream stream lagging behind");
continue
}
None => Poll::Ready(None),
}
}
}
}