1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
use reth_payload_primitives::PayloadTypes;
use std::{
    pin::Pin,
    task::{ready, Context, Poll},
};
use tokio::sync::broadcast;
use tokio_stream::{
    wrappers::{errors::BroadcastStreamRecvError, BroadcastStream},
    Stream, StreamExt,
};
use tracing::debug;

/// Payload builder events.
#[derive(Clone, Debug)]
pub enum Events<T: PayloadTypes> {
    /// The payload attributes as
    /// they are received from the CL through the engine api.
    Attributes(T::PayloadBuilderAttributes),
    /// The built payload that has been just built.
    /// Triggered by the CL whenever it asks for an execution payload.
    /// This event is only thrown if the CL is a validator.
    BuiltPayload(T::BuiltPayload),
}

/// Represents a receiver for various payload events.
#[derive(Debug)]
pub struct PayloadEvents<T: PayloadTypes> {
    /// The receiver for the payload events.
    pub receiver: broadcast::Receiver<Events<T>>,
}

impl<T: PayloadTypes + 'static> PayloadEvents<T> {
    /// Convert this receiver into a stream of `PayloadEvents`.
    pub fn into_stream(self) -> BroadcastStream<Events<T>> {
        BroadcastStream::new(self.receiver)
    }
    /// Asynchronously receives the next payload event.
    pub async fn recv(self) -> Option<Result<Events<T>, BroadcastStreamRecvError>> {
        let mut event_stream = self.into_stream();
        event_stream.next().await
    }

    /// Returns a new stream that yields all built payloads.
    pub fn into_built_payload_stream(self) -> BuiltPayloadStream<T> {
        BuiltPayloadStream { st: self.into_stream() }
    }

    /// Returns a new stream that yields received payload attributes
    pub fn into_attributes_stream(self) -> PayloadAttributeStream<T> {
        PayloadAttributeStream { st: self.into_stream() }
    }
}

/// A stream that yields built payloads.
#[derive(Debug)]
#[pin_project::pin_project]
pub struct BuiltPayloadStream<T: PayloadTypes> {
    /// The stream of events.
    #[pin]
    st: BroadcastStream<Events<T>>,
}

impl<T: PayloadTypes + 'static> Stream for BuiltPayloadStream<T> {
    type Item = T::BuiltPayload;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        loop {
            return match ready!(self.as_mut().project().st.poll_next(cx)) {
                Some(Ok(Events::BuiltPayload(payload))) => Poll::Ready(Some(payload)),
                Some(Ok(Events::Attributes(_))) => {
                    // ignoring attributes
                    continue
                }
                Some(Err(err)) => {
                    debug!(%err, "payload event stream stream lagging behind");
                    continue
                }
                None => Poll::Ready(None),
            }
        }
    }
}

/// A stream that yields received payload attributes
#[derive(Debug)]
#[pin_project::pin_project]
pub struct PayloadAttributeStream<T: PayloadTypes> {
    /// The stream of events.
    #[pin]
    st: BroadcastStream<Events<T>>,
}

impl<T: PayloadTypes + 'static> Stream for PayloadAttributeStream<T> {
    type Item = T::PayloadBuilderAttributes;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        loop {
            return match ready!(self.as_mut().project().st.poll_next(cx)) {
                Some(Ok(Events::Attributes(attr))) => Poll::Ready(Some(attr)),
                Some(Ok(Events::BuiltPayload(_))) => {
                    // ignoring payloads
                    continue
                }
                Some(Err(err)) => {
                    debug!(%err, "payload event stream stream lagging behind");
                    continue
                }
                None => Poll::Ready(None),
            }
        }
    }
}