reth_engine_util/
skip_new_payload.rs

1//! Stream wrapper that skips specified number of new payload messages.
2
3use alloy_rpc_types_engine::{PayloadStatus, PayloadStatusEnum};
4use futures::{Stream, StreamExt};
5use reth_engine_primitives::{BeaconEngineMessage, EngineTypes, ExecutionPayload};
6use std::{
7    pin::Pin,
8    task::{ready, Context, Poll},
9};
10
11/// Engine API stream wrapper that skips the specified number of new payload messages.
12#[derive(Debug)]
13#[pin_project::pin_project]
14pub struct EngineSkipNewPayload<S> {
15    #[pin]
16    stream: S,
17    /// The number of messages to skip.
18    threshold: usize,
19    /// Current count of skipped messages.
20    skipped: usize,
21}
22
23impl<S> EngineSkipNewPayload<S> {
24    /// Creates new [`EngineSkipNewPayload`] stream wrapper.
25    pub const fn new(stream: S, threshold: usize) -> Self {
26        Self { stream, threshold, skipped: 0 }
27    }
28}
29
30impl<S, Engine> Stream for EngineSkipNewPayload<S>
31where
32    S: Stream<Item = BeaconEngineMessage<Engine>>,
33    Engine: EngineTypes,
34{
35    type Item = S::Item;
36
37    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
38        let mut this = self.project();
39
40        loop {
41            let next = ready!(this.stream.poll_next_unpin(cx));
42            let item = match next {
43                Some(BeaconEngineMessage::NewPayload { payload, tx }) => {
44                    if this.skipped < this.threshold {
45                        *this.skipped += 1;
46                        tracing::warn!(
47                            target: "engine::stream::skip_new_payload",
48                            block_number = payload.block_number(),
49                            block_hash = %payload.block_hash(),
50                            ?payload,
51                            threshold=this.threshold,
52                            skipped=this.skipped, "Skipping new payload"
53                        );
54                        let _ = tx.send(Ok(PayloadStatus::from_status(PayloadStatusEnum::Syncing)));
55                        continue
56                    }
57                    *this.skipped = 0;
58                    Some(BeaconEngineMessage::NewPayload { payload, tx })
59                }
60                next => next,
61            };
62            return Poll::Ready(item)
63        }
64    }
65}