reth_engine_util/
lib.rs

1//! Collection of various stream utilities for consensus engine.
2
3#![doc(
4    html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
5    html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
6    issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
7)]
8#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
9#![cfg_attr(not(test), warn(unused_crate_dependencies))]
10
11use futures::Stream;
12use reth_engine_primitives::BeaconEngineMessage;
13use reth_payload_primitives::PayloadTypes;
14use std::path::PathBuf;
15use tokio_util::either::Either;
16
17pub mod engine_store;
18use engine_store::EngineStoreStream;
19
20pub mod skip_fcu;
21use skip_fcu::EngineSkipFcu;
22
23pub mod skip_new_payload;
24use skip_new_payload::EngineSkipNewPayload;
25
26pub mod reorg;
27use reorg::EngineReorg;
28
29/// The collection of stream extensions for engine API message stream.
30pub trait EngineMessageStreamExt<T: PayloadTypes>: Stream<Item = BeaconEngineMessage<T>> {
31    /// Skips the specified number of [`BeaconEngineMessage::ForkchoiceUpdated`] messages from the
32    /// engine message stream.
33    fn skip_fcu(self, count: usize) -> EngineSkipFcu<Self>
34    where
35        Self: Sized,
36    {
37        EngineSkipFcu::new(self, count)
38    }
39
40    /// If the count is [Some], returns the stream that skips the specified number of
41    /// [`BeaconEngineMessage::ForkchoiceUpdated`] messages. Otherwise, returns `Self`.
42    fn maybe_skip_fcu(self, maybe_count: Option<usize>) -> Either<EngineSkipFcu<Self>, Self>
43    where
44        Self: Sized,
45    {
46        if let Some(count) = maybe_count {
47            Either::Left(self.skip_fcu(count))
48        } else {
49            Either::Right(self)
50        }
51    }
52
53    /// Skips the specified number of [`BeaconEngineMessage::NewPayload`] messages from the
54    /// engine message stream.
55    fn skip_new_payload(self, count: usize) -> EngineSkipNewPayload<Self>
56    where
57        Self: Sized,
58    {
59        EngineSkipNewPayload::new(self, count)
60    }
61
62    /// If the count is [Some], returns the stream that skips the specified number of
63    /// [`BeaconEngineMessage::NewPayload`] messages. Otherwise, returns `Self`.
64    fn maybe_skip_new_payload(
65        self,
66        maybe_count: Option<usize>,
67    ) -> Either<EngineSkipNewPayload<Self>, Self>
68    where
69        Self: Sized,
70    {
71        if let Some(count) = maybe_count {
72            Either::Left(self.skip_new_payload(count))
73        } else {
74            Either::Right(self)
75        }
76    }
77
78    /// Stores engine messages at the specified location.
79    fn store_messages(self, path: PathBuf) -> EngineStoreStream<Self>
80    where
81        Self: Sized,
82    {
83        EngineStoreStream::new(self, path)
84    }
85
86    /// If the path is [Some], returns the stream that stores engine messages at the specified
87    /// location. Otherwise, returns `Self`.
88    fn maybe_store_messages(
89        self,
90        maybe_path: Option<PathBuf>,
91    ) -> Either<EngineStoreStream<Self>, Self>
92    where
93        Self: Sized,
94    {
95        if let Some(path) = maybe_path {
96            Either::Left(self.store_messages(path))
97        } else {
98            Either::Right(self)
99        }
100    }
101
102    /// Creates reorgs with specified frequency.
103    fn reorg<Provider, Evm, Validator>(
104        self,
105        provider: Provider,
106        evm_config: Evm,
107        payload_validator: Validator,
108        frequency: usize,
109        depth: Option<usize>,
110    ) -> EngineReorg<Self, T, Provider, Evm, Validator>
111    where
112        Self: Sized,
113    {
114        EngineReorg::new(
115            self,
116            provider,
117            evm_config,
118            payload_validator,
119            frequency,
120            depth.unwrap_or_default(),
121        )
122    }
123
124    /// If frequency is [Some], returns the stream that creates reorgs with
125    /// specified frequency. Otherwise, returns `Self`.
126    fn maybe_reorg<Provider, Evm, Validator>(
127        self,
128        provider: Provider,
129        evm_config: Evm,
130        payload_validator: Validator,
131        frequency: Option<usize>,
132        depth: Option<usize>,
133    ) -> Either<EngineReorg<Self, T, Provider, Evm, Validator>, Self>
134    where
135        Self: Sized,
136    {
137        if let Some(frequency) = frequency {
138            Either::Left(reorg::EngineReorg::new(
139                self,
140                provider,
141                evm_config,
142                payload_validator,
143                frequency,
144                depth.unwrap_or_default(),
145            ))
146        } else {
147            Either::Right(self)
148        }
149    }
150}
151
152impl<T, S> EngineMessageStreamExt<T> for S
153where
154    T: PayloadTypes,
155    S: Stream<Item = BeaconEngineMessage<T>>,
156{
157}