reth_engine_util/
skip_new_payload.rs
1use alloy_rpc_types_engine::{PayloadStatus, PayloadStatusEnum};
4use futures::{Stream, StreamExt};
5use reth_engine_primitives::{BeaconEngineMessage, ExecutionPayload};
6use reth_payload_primitives::PayloadTypes;
7use std::{
8 pin::Pin,
9 task::{ready, Context, Poll},
10};
11
12#[derive(Debug)]
14#[pin_project::pin_project]
15pub struct EngineSkipNewPayload<S> {
16 #[pin]
17 stream: S,
18 threshold: usize,
20 skipped: usize,
22}
23
24impl<S> EngineSkipNewPayload<S> {
25 pub const fn new(stream: S, threshold: usize) -> Self {
27 Self { stream, threshold, skipped: 0 }
28 }
29}
30
31impl<S, T> Stream for EngineSkipNewPayload<S>
32where
33 S: Stream<Item = BeaconEngineMessage<T>>,
34 T: PayloadTypes,
35{
36 type Item = S::Item;
37
38 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
39 let mut this = self.project();
40
41 loop {
42 let next = ready!(this.stream.poll_next_unpin(cx));
43 let item = match next {
44 Some(BeaconEngineMessage::NewPayload { payload, tx }) => {
45 if this.skipped < this.threshold {
46 *this.skipped += 1;
47 tracing::warn!(
48 target: "engine::stream::skip_new_payload",
49 block_number = payload.block_number(),
50 block_hash = %payload.block_hash(),
51 ?payload,
52 threshold=this.threshold,
53 skipped=this.skipped, "Skipping new payload"
54 );
55 let _ = tx.send(Ok(PayloadStatus::from_status(PayloadStatusEnum::Syncing)));
56 continue
57 }
58 *this.skipped = 0;
59 Some(BeaconEngineMessage::NewPayload { payload, tx })
60 }
61 next => next,
62 };
63 return Poll::Ready(item)
64 }
65 }
66}