reth_payload_builder_primitives/
events.rs

1use reth_payload_primitives::PayloadTypes;
2use std::{
3    pin::Pin,
4    task::{ready, Context, Poll},
5};
6use tokio::sync::broadcast;
7use tokio_stream::{
8    wrappers::{errors::BroadcastStreamRecvError, BroadcastStream},
9    Stream, StreamExt,
10};
11use tracing::debug;
12
13/// Payload builder events.
14#[derive(Clone, Debug)]
15pub enum Events<T: PayloadTypes> {
16    /// The payload attributes as
17    /// they are received from the CL through the engine api.
18    Attributes(T::PayloadBuilderAttributes),
19    /// The built payload that has been just built.
20    /// Triggered by the CL whenever it asks for an execution payload.
21    /// This event is only thrown if the CL is a validator.
22    BuiltPayload(T::BuiltPayload),
23}
24
25/// Represents a receiver for various payload events.
26#[derive(Debug)]
27pub struct PayloadEvents<T: PayloadTypes> {
28    /// The receiver for the payload events.
29    pub receiver: broadcast::Receiver<Events<T>>,
30}
31
32impl<T: PayloadTypes> PayloadEvents<T> {
33    /// Convert this receiver into a stream of `PayloadEvents`.
34    pub fn into_stream(self) -> BroadcastStream<Events<T>> {
35        BroadcastStream::new(self.receiver)
36    }
37    /// Asynchronously receives the next payload event.
38    pub async fn recv(self) -> Option<Result<Events<T>, BroadcastStreamRecvError>> {
39        let mut event_stream = self.into_stream();
40        event_stream.next().await
41    }
42
43    /// Returns a new stream that yields all built payloads.
44    pub fn into_built_payload_stream(self) -> BuiltPayloadStream<T> {
45        BuiltPayloadStream { st: self.into_stream() }
46    }
47
48    /// Returns a new stream that yields received payload attributes
49    pub fn into_attributes_stream(self) -> PayloadAttributeStream<T> {
50        PayloadAttributeStream { st: self.into_stream() }
51    }
52}
53
54/// A stream that yields built payloads.
55#[derive(Debug)]
56#[pin_project::pin_project]
57pub struct BuiltPayloadStream<T: PayloadTypes> {
58    /// The stream of events.
59    #[pin]
60    st: BroadcastStream<Events<T>>,
61}
62
63impl<T: PayloadTypes> Stream for BuiltPayloadStream<T> {
64    type Item = T::BuiltPayload;
65
66    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
67        loop {
68            return match ready!(self.as_mut().project().st.poll_next(cx)) {
69                Some(Ok(Events::BuiltPayload(payload))) => Poll::Ready(Some(payload)),
70                Some(Ok(Events::Attributes(_))) => {
71                    // ignoring attributes
72                    continue
73                }
74                Some(Err(err)) => {
75                    debug!(%err, "payload event stream lagging behind");
76                    continue
77                }
78                None => Poll::Ready(None),
79            }
80        }
81    }
82}
83
84/// A stream that yields received payload attributes
85#[derive(Debug)]
86#[pin_project::pin_project]
87pub struct PayloadAttributeStream<T: PayloadTypes> {
88    /// The stream of events.
89    #[pin]
90    st: BroadcastStream<Events<T>>,
91}
92
93impl<T: PayloadTypes> Stream for PayloadAttributeStream<T> {
94    type Item = T::PayloadBuilderAttributes;
95
96    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
97        loop {
98            return match ready!(self.as_mut().project().st.poll_next(cx)) {
99                Some(Ok(Events::Attributes(attr))) => Poll::Ready(Some(attr)),
100                Some(Ok(Events::BuiltPayload(_))) => {
101                    // ignoring payloads
102                    continue
103                }
104                Some(Err(err)) => {
105                    debug!(%err, "payload event stream lagging behind");
106                    continue
107                }
108                None => Poll::Ready(None),
109            }
110        }
111    }
112}