reth_payload_builder_primitives/
events.rs
1use reth_payload_primitives::PayloadTypes;
2use std::{
3 pin::Pin,
4 task::{ready, Context, Poll},
5};
6use tokio::sync::broadcast;
7use tokio_stream::{
8 wrappers::{errors::BroadcastStreamRecvError, BroadcastStream},
9 Stream, StreamExt,
10};
11use tracing::debug;
12
13#[derive(Clone, Debug)]
15pub enum Events<T: PayloadTypes> {
16 Attributes(T::PayloadBuilderAttributes),
19 BuiltPayload(T::BuiltPayload),
23}
24
25#[derive(Debug)]
27pub struct PayloadEvents<T: PayloadTypes> {
28 pub receiver: broadcast::Receiver<Events<T>>,
30}
31
32impl<T: PayloadTypes> PayloadEvents<T> {
33 pub fn into_stream(self) -> BroadcastStream<Events<T>> {
35 BroadcastStream::new(self.receiver)
36 }
37 pub async fn recv(self) -> Option<Result<Events<T>, BroadcastStreamRecvError>> {
39 let mut event_stream = self.into_stream();
40 event_stream.next().await
41 }
42
43 pub fn into_built_payload_stream(self) -> BuiltPayloadStream<T> {
45 BuiltPayloadStream { st: self.into_stream() }
46 }
47
48 pub fn into_attributes_stream(self) -> PayloadAttributeStream<T> {
50 PayloadAttributeStream { st: self.into_stream() }
51 }
52}
53
54#[derive(Debug)]
56#[pin_project::pin_project]
57pub struct BuiltPayloadStream<T: PayloadTypes> {
58 #[pin]
60 st: BroadcastStream<Events<T>>,
61}
62
63impl<T: PayloadTypes> Stream for BuiltPayloadStream<T> {
64 type Item = T::BuiltPayload;
65
66 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
67 loop {
68 return match ready!(self.as_mut().project().st.poll_next(cx)) {
69 Some(Ok(Events::BuiltPayload(payload))) => Poll::Ready(Some(payload)),
70 Some(Ok(Events::Attributes(_))) => {
71 continue
73 }
74 Some(Err(err)) => {
75 debug!(%err, "payload event stream lagging behind");
76 continue
77 }
78 None => Poll::Ready(None),
79 }
80 }
81 }
82}
83
84#[derive(Debug)]
86#[pin_project::pin_project]
87pub struct PayloadAttributeStream<T: PayloadTypes> {
88 #[pin]
90 st: BroadcastStream<Events<T>>,
91}
92
93impl<T: PayloadTypes> Stream for PayloadAttributeStream<T> {
94 type Item = T::PayloadBuilderAttributes;
95
96 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
97 loop {
98 return match ready!(self.as_mut().project().st.poll_next(cx)) {
99 Some(Ok(Events::Attributes(attr))) => Poll::Ready(Some(attr)),
100 Some(Ok(Events::BuiltPayload(_))) => {
101 continue
103 }
104 Some(Err(err)) => {
105 debug!(%err, "payload event stream lagging behind");
106 continue
107 }
108 None => Poll::Ready(None),
109 }
110 }
111 }
112}