reth_engine_util/
skip_new_payload.rs
1use alloy_rpc_types_engine::{PayloadStatus, PayloadStatusEnum};
4use futures::{Stream, StreamExt};
5use reth_engine_primitives::{BeaconEngineMessage, EngineTypes, ExecutionPayload};
6use std::{
7 pin::Pin,
8 task::{ready, Context, Poll},
9};
10
11#[derive(Debug)]
13#[pin_project::pin_project]
14pub struct EngineSkipNewPayload<S> {
15 #[pin]
16 stream: S,
17 threshold: usize,
19 skipped: usize,
21}
22
23impl<S> EngineSkipNewPayload<S> {
24 pub const fn new(stream: S, threshold: usize) -> Self {
26 Self { stream, threshold, skipped: 0 }
27 }
28}
29
30impl<S, Engine> Stream for EngineSkipNewPayload<S>
31where
32 S: Stream<Item = BeaconEngineMessage<Engine>>,
33 Engine: EngineTypes,
34{
35 type Item = S::Item;
36
37 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
38 let mut this = self.project();
39
40 loop {
41 let next = ready!(this.stream.poll_next_unpin(cx));
42 let item = match next {
43 Some(BeaconEngineMessage::NewPayload { payload, tx }) => {
44 if this.skipped < this.threshold {
45 *this.skipped += 1;
46 tracing::warn!(
47 target: "engine::stream::skip_new_payload",
48 block_number = payload.block_number(),
49 block_hash = %payload.block_hash(),
50 ?payload,
51 threshold=this.threshold,
52 skipped=this.skipped, "Skipping new payload"
53 );
54 let _ = tx.send(Ok(PayloadStatus::from_status(PayloadStatusEnum::Syncing)));
55 continue
56 }
57 *this.skipped = 0;
58 Some(BeaconEngineMessage::NewPayload { payload, tx })
59 }
60 next => next,
61 };
62 return Poll::Ready(item)
63 }
64 }
65}