reth_engine_util/
skip_fcu.rs

1//! Stream wrapper that skips specified number of FCUs.
2
3use futures::{Stream, StreamExt};
4use reth_engine_primitives::{BeaconEngineMessage, OnForkChoiceUpdated};
5use reth_payload_primitives::PayloadTypes;
6use std::{
7    pin::Pin,
8    task::{ready, Context, Poll},
9};
10
11/// Engine API stream wrapper that skips the specified number of forkchoice updated messages.
12#[derive(Debug)]
13#[pin_project::pin_project]
14pub struct EngineSkipFcu<S> {
15    #[pin]
16    stream: S,
17    /// The number of FCUs to skip.
18    threshold: usize,
19    /// Current count of skipped FCUs.
20    skipped: usize,
21}
22
23impl<S> EngineSkipFcu<S> {
24    /// Creates new [`EngineSkipFcu`] stream wrapper.
25    pub const fn new(stream: S, threshold: usize) -> Self {
26        Self {
27            stream,
28            threshold,
29            // Start with `threshold` so that the first FCU goes through.
30            skipped: threshold,
31        }
32    }
33}
34
35impl<S, T> Stream for EngineSkipFcu<S>
36where
37    S: Stream<Item = BeaconEngineMessage<T>>,
38    T: PayloadTypes,
39{
40    type Item = S::Item;
41
42    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
43        let mut this = self.project();
44
45        loop {
46            let next = ready!(this.stream.poll_next_unpin(cx));
47            let item = match next {
48                Some(BeaconEngineMessage::ForkchoiceUpdated {
49                    state,
50                    payload_attrs,
51                    tx,
52                    version,
53                }) => {
54                    if this.skipped < this.threshold {
55                        *this.skipped += 1;
56                        tracing::warn!(target: "engine::stream::skip_fcu", ?state, ?payload_attrs, threshold=this.threshold, skipped=this.skipped, "Skipping FCU");
57                        let _ = tx.send(Ok(OnForkChoiceUpdated::syncing()));
58                        continue
59                    }
60                    *this.skipped = 0;
61                    Some(BeaconEngineMessage::ForkchoiceUpdated {
62                        state,
63                        payload_attrs,
64                        tx,
65                        version,
66                    })
67                }
68                next => next,
69            };
70            return Poll::Ready(item)
71        }
72    }
73}