reth_engine_util/
skip_fcu.rs
1use futures::{Stream, StreamExt};
4use reth_engine_primitives::{BeaconEngineMessage, OnForkChoiceUpdated};
5use reth_payload_primitives::PayloadTypes;
6use std::{
7 pin::Pin,
8 task::{ready, Context, Poll},
9};
10
11#[derive(Debug)]
13#[pin_project::pin_project]
14pub struct EngineSkipFcu<S> {
15 #[pin]
16 stream: S,
17 threshold: usize,
19 skipped: usize,
21}
22
23impl<S> EngineSkipFcu<S> {
24 pub const fn new(stream: S, threshold: usize) -> Self {
26 Self {
27 stream,
28 threshold,
29 skipped: threshold,
31 }
32 }
33}
34
35impl<S, T> Stream for EngineSkipFcu<S>
36where
37 S: Stream<Item = BeaconEngineMessage<T>>,
38 T: PayloadTypes,
39{
40 type Item = S::Item;
41
42 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
43 let mut this = self.project();
44
45 loop {
46 let next = ready!(this.stream.poll_next_unpin(cx));
47 let item = match next {
48 Some(BeaconEngineMessage::ForkchoiceUpdated {
49 state,
50 payload_attrs,
51 tx,
52 version,
53 }) => {
54 if this.skipped < this.threshold {
55 *this.skipped += 1;
56 tracing::warn!(target: "engine::stream::skip_fcu", ?state, ?payload_attrs, threshold=this.threshold, skipped=this.skipped, "Skipping FCU");
57 let _ = tx.send(Ok(OnForkChoiceUpdated::syncing()));
58 continue
59 }
60 *this.skipped = 0;
61 Some(BeaconEngineMessage::ForkchoiceUpdated {
62 state,
63 payload_attrs,
64 tx,
65 version,
66 })
67 }
68 next => next,
69 };
70 return Poll::Ready(item)
71 }
72 }
73}