1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
//! Collection of various stream utilities for consensus engine.

use futures::Stream;
use reth_beacon_consensus::BeaconEngineMessage;
use reth_engine_primitives::EngineTypes;
use reth_payload_validator::ExecutionPayloadValidator;
use std::path::PathBuf;
use tokio_util::either::Either;

pub mod engine_store;
use engine_store::EngineStoreStream;

pub mod skip_fcu;
use skip_fcu::EngineSkipFcu;

pub mod skip_new_payload;
use skip_new_payload::EngineSkipNewPayload;

pub mod reorg;
use reorg::EngineReorg;

/// The collection of stream extensions for engine API message stream.
pub trait EngineMessageStreamExt<Engine: EngineTypes>:
    Stream<Item = BeaconEngineMessage<Engine>>
{
    /// Skips the specified number of [`BeaconEngineMessage::ForkchoiceUpdated`] messages from the
    /// engine message stream.
    fn skip_fcu(self, count: usize) -> EngineSkipFcu<Self>
    where
        Self: Sized,
    {
        EngineSkipFcu::new(self, count)
    }

    /// If the count is [Some], returns the stream that skips the specified number of
    /// [`BeaconEngineMessage::ForkchoiceUpdated`] messages. Otherwise, returns `Self`.
    fn maybe_skip_fcu(self, maybe_count: Option<usize>) -> Either<EngineSkipFcu<Self>, Self>
    where
        Self: Sized,
    {
        if let Some(count) = maybe_count {
            Either::Left(self.skip_fcu(count))
        } else {
            Either::Right(self)
        }
    }

    /// Skips the specified number of [`BeaconEngineMessage::NewPayload`] messages from the
    /// engine message stream.
    fn skip_new_payload(self, count: usize) -> EngineSkipNewPayload<Self>
    where
        Self: Sized,
    {
        EngineSkipNewPayload::new(self, count)
    }

    /// If the count is [Some], returns the stream that skips the specified number of
    /// [`BeaconEngineMessage::NewPayload`] messages. Otherwise, returns `Self`.
    fn maybe_skip_new_payload(
        self,
        maybe_count: Option<usize>,
    ) -> Either<EngineSkipNewPayload<Self>, Self>
    where
        Self: Sized,
    {
        if let Some(count) = maybe_count {
            Either::Left(self.skip_new_payload(count))
        } else {
            Either::Right(self)
        }
    }

    /// Stores engine messages at the specified location.
    fn store_messages(self, path: PathBuf) -> EngineStoreStream<Self>
    where
        Self: Sized,
    {
        EngineStoreStream::new(self, path)
    }

    /// If the path is [Some], returns the stream that stores engine messages at the specified
    /// location. Otherwise, returns `Self`.
    fn maybe_store_messages(
        self,
        maybe_path: Option<PathBuf>,
    ) -> Either<EngineStoreStream<Self>, Self>
    where
        Self: Sized,
    {
        if let Some(path) = maybe_path {
            Either::Left(self.store_messages(path))
        } else {
            Either::Right(self)
        }
    }

    /// Creates reorgs with specified frequency.
    fn reorg<Provider, Evm>(
        self,
        provider: Provider,
        evm_config: Evm,
        payload_validator: ExecutionPayloadValidator,
        frequency: usize,
        depth: Option<usize>,
    ) -> EngineReorg<Self, Engine, Provider, Evm>
    where
        Self: Sized,
    {
        EngineReorg::new(
            self,
            provider,
            evm_config,
            payload_validator,
            frequency,
            depth.unwrap_or_default(),
        )
    }

    /// If frequency is [Some], returns the stream that creates reorgs with
    /// specified frequency. Otherwise, returns `Self`.
    fn maybe_reorg<Provider, Evm>(
        self,
        provider: Provider,
        evm_config: Evm,
        payload_validator: ExecutionPayloadValidator,
        frequency: Option<usize>,
        depth: Option<usize>,
    ) -> Either<EngineReorg<Self, Engine, Provider, Evm>, Self>
    where
        Self: Sized,
    {
        if let Some(frequency) = frequency {
            Either::Left(reorg::EngineReorg::new(
                self,
                provider,
                evm_config,
                payload_validator,
                frequency,
                depth.unwrap_or_default(),
            ))
        } else {
            Either::Right(self)
        }
    }
}

impl<Engine, T> EngineMessageStreamExt<Engine> for T
where
    Engine: EngineTypes,
    T: Stream<Item = BeaconEngineMessage<Engine>>,
{
}