reth_engine_util/
skip_new_payload.rs

1//! Stream wrapper that skips specified number of new payload messages.
2
3use alloy_rpc_types_engine::{PayloadStatus, PayloadStatusEnum};
4use futures::{Stream, StreamExt};
5use reth_engine_primitives::{BeaconEngineMessage, ExecutionPayload};
6use reth_payload_primitives::PayloadTypes;
7use std::{
8    pin::Pin,
9    task::{ready, Context, Poll},
10};
11
12/// Engine API stream wrapper that skips the specified number of new payload messages.
13#[derive(Debug)]
14#[pin_project::pin_project]
15pub struct EngineSkipNewPayload<S> {
16    #[pin]
17    stream: S,
18    /// The number of messages to skip.
19    threshold: usize,
20    /// Current count of skipped messages.
21    skipped: usize,
22}
23
24impl<S> EngineSkipNewPayload<S> {
25    /// Creates new [`EngineSkipNewPayload`] stream wrapper.
26    pub const fn new(stream: S, threshold: usize) -> Self {
27        Self { stream, threshold, skipped: 0 }
28    }
29}
30
31impl<S, T> Stream for EngineSkipNewPayload<S>
32where
33    S: Stream<Item = BeaconEngineMessage<T>>,
34    T: PayloadTypes,
35{
36    type Item = S::Item;
37
38    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
39        let mut this = self.project();
40
41        loop {
42            let next = ready!(this.stream.poll_next_unpin(cx));
43            let item = match next {
44                Some(BeaconEngineMessage::NewPayload { payload, tx }) => {
45                    if this.skipped < this.threshold {
46                        *this.skipped += 1;
47                        tracing::warn!(
48                            target: "engine::stream::skip_new_payload",
49                            block_number = payload.block_number(),
50                            block_hash = %payload.block_hash(),
51                            ?payload,
52                            threshold=this.threshold,
53                            skipped=this.skipped, "Skipping new payload"
54                        );
55                        let _ = tx.send(Ok(PayloadStatus::from_status(PayloadStatusEnum::Syncing)));
56                        continue
57                    }
58                    *this.skipped = 0;
59                    Some(BeaconEngineMessage::NewPayload { payload, tx })
60                }
61                next => next,
62            };
63            return Poll::Ready(item)
64        }
65    }
66}