reth_engine_util/
skip_fcu.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
//! Stream wrapper that skips specified number of FCUs.

use futures::{Stream, StreamExt};
use reth_engine_primitives::{BeaconEngineMessage, EngineTypes, OnForkChoiceUpdated};
use std::{
    pin::Pin,
    task::{ready, Context, Poll},
};

/// Engine API stream wrapper that skips the specified number of forkchoice updated messages.
#[derive(Debug)]
#[pin_project::pin_project]
pub struct EngineSkipFcu<S> {
    #[pin]
    stream: S,
    /// The number of FCUs to skip.
    threshold: usize,
    /// Current count of skipped FCUs.
    skipped: usize,
}

impl<S> EngineSkipFcu<S> {
    /// Creates new [`EngineSkipFcu`] stream wrapper.
    pub const fn new(stream: S, threshold: usize) -> Self {
        Self {
            stream,
            threshold,
            // Start with `threshold` so that the first FCU goes through.
            skipped: threshold,
        }
    }
}

impl<S, Engine> Stream for EngineSkipFcu<S>
where
    S: Stream<Item = BeaconEngineMessage<Engine>>,
    Engine: EngineTypes,
{
    type Item = S::Item;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let mut this = self.project();

        loop {
            let next = ready!(this.stream.poll_next_unpin(cx));
            let item = match next {
                Some(BeaconEngineMessage::ForkchoiceUpdated {
                    state,
                    payload_attrs,
                    tx,
                    version,
                }) => {
                    if this.skipped < this.threshold {
                        *this.skipped += 1;
                        tracing::warn!(target: "engine::stream::skip_fcu", ?state, ?payload_attrs, threshold=this.threshold, skipped=this.skipped, "Skipping FCU");
                        let _ = tx.send(Ok(OnForkChoiceUpdated::syncing()));
                        continue
                    }
                    *this.skipped = 0;
                    Some(BeaconEngineMessage::ForkchoiceUpdated {
                        state,
                        payload_attrs,
                        tx,
                        version,
                    })
                }
                next => next,
            };
            return Poll::Ready(item)
        }
    }
}