reth_engine_util/
lib.rs

1//! Collection of various stream utilities for consensus engine.
2
3#![doc(
4    html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
5    html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
6    issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
7)]
8#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
9#![cfg_attr(not(test), warn(unused_crate_dependencies))]
10
11use futures::{Future, Stream};
12use reth_engine_primitives::BeaconEngineMessage;
13use reth_payload_primitives::PayloadTypes;
14use std::path::PathBuf;
15use tokio_util::either::Either;
16
17pub mod engine_store;
18use engine_store::EngineStoreStream;
19
20pub mod skip_fcu;
21use skip_fcu::EngineSkipFcu;
22
23pub mod skip_new_payload;
24use skip_new_payload::EngineSkipNewPayload;
25
26pub mod reorg;
27use reorg::EngineReorg;
28
29/// The result type for `maybe_reorg` method.
30type MaybeReorgResult<S, T, Provider, Evm, Validator, E> =
31    Result<Either<EngineReorg<S, T, Provider, Evm, Validator>, S>, E>;
32
33/// The collection of stream extensions for engine API message stream.
34pub trait EngineMessageStreamExt<T: PayloadTypes>: Stream<Item = BeaconEngineMessage<T>> {
35    /// Skips the specified number of [`BeaconEngineMessage::ForkchoiceUpdated`] messages from the
36    /// engine message stream.
37    fn skip_fcu(self, count: usize) -> EngineSkipFcu<Self>
38    where
39        Self: Sized,
40    {
41        EngineSkipFcu::new(self, count)
42    }
43
44    /// If the count is [Some], returns the stream that skips the specified number of
45    /// [`BeaconEngineMessage::ForkchoiceUpdated`] messages. Otherwise, returns `Self`.
46    fn maybe_skip_fcu(self, maybe_count: Option<usize>) -> Either<EngineSkipFcu<Self>, Self>
47    where
48        Self: Sized,
49    {
50        if let Some(count) = maybe_count {
51            Either::Left(self.skip_fcu(count))
52        } else {
53            Either::Right(self)
54        }
55    }
56
57    /// Skips the specified number of [`BeaconEngineMessage::NewPayload`] messages from the
58    /// engine message stream.
59    fn skip_new_payload(self, count: usize) -> EngineSkipNewPayload<Self>
60    where
61        Self: Sized,
62    {
63        EngineSkipNewPayload::new(self, count)
64    }
65
66    /// If the count is [Some], returns the stream that skips the specified number of
67    /// [`BeaconEngineMessage::NewPayload`] messages. Otherwise, returns `Self`.
68    fn maybe_skip_new_payload(
69        self,
70        maybe_count: Option<usize>,
71    ) -> Either<EngineSkipNewPayload<Self>, Self>
72    where
73        Self: Sized,
74    {
75        if let Some(count) = maybe_count {
76            Either::Left(self.skip_new_payload(count))
77        } else {
78            Either::Right(self)
79        }
80    }
81
82    /// Stores engine messages at the specified location.
83    fn store_messages(self, path: PathBuf) -> EngineStoreStream<Self>
84    where
85        Self: Sized,
86    {
87        EngineStoreStream::new(self, path)
88    }
89
90    /// If the path is [Some], returns the stream that stores engine messages at the specified
91    /// location. Otherwise, returns `Self`.
92    fn maybe_store_messages(
93        self,
94        maybe_path: Option<PathBuf>,
95    ) -> Either<EngineStoreStream<Self>, Self>
96    where
97        Self: Sized,
98    {
99        if let Some(path) = maybe_path {
100            Either::Left(self.store_messages(path))
101        } else {
102            Either::Right(self)
103        }
104    }
105
106    /// Creates reorgs with specified frequency.
107    fn reorg<Provider, Evm, Validator>(
108        self,
109        provider: Provider,
110        evm_config: Evm,
111        payload_validator: Validator,
112        frequency: usize,
113        depth: Option<usize>,
114    ) -> EngineReorg<Self, T, Provider, Evm, Validator>
115    where
116        Self: Sized,
117    {
118        EngineReorg::new(
119            self,
120            provider,
121            evm_config,
122            payload_validator,
123            frequency,
124            depth.unwrap_or_default(),
125        )
126    }
127
128    /// If frequency is [Some], returns the stream that creates reorgs with
129    /// specified frequency. Otherwise, returns `Self`.
130    ///
131    /// The `payload_validator_fn` closure is only called if `frequency` is `Some`,
132    /// allowing for lazy initialization of the validator.
133    fn maybe_reorg<Provider, Evm, Validator, E, F, Fut>(
134        self,
135        provider: Provider,
136        evm_config: Evm,
137        payload_validator_fn: F,
138        frequency: Option<usize>,
139        depth: Option<usize>,
140    ) -> impl Future<Output = MaybeReorgResult<Self, T, Provider, Evm, Validator, E>> + Send
141    where
142        Self: Sized + Send,
143        Provider: Send,
144        Evm: Send,
145        F: FnOnce() -> Fut + Send,
146        Fut: Future<Output = Result<Validator, E>> + Send,
147    {
148        async move {
149            if let Some(frequency) = frequency {
150                let validator = payload_validator_fn().await?;
151                Ok(Either::Left(reorg::EngineReorg::new(
152                    self,
153                    provider,
154                    evm_config,
155                    validator,
156                    frequency,
157                    depth.unwrap_or_default(),
158                )))
159            } else {
160                Ok(Either::Right(self))
161            }
162        }
163    }
164}
165
166impl<T, S> EngineMessageStreamExt<T> for S
167where
168    T: PayloadTypes,
169    S: Stream<Item = BeaconEngineMessage<T>>,
170{
171}