1#![doc(
4 html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
5 html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
6 issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
7)]
8#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
9#![cfg_attr(not(test), warn(unused_crate_dependencies))]
10
11use futures::{Future, Stream};
12use reth_engine_primitives::BeaconEngineMessage;
13use reth_payload_primitives::PayloadTypes;
14use std::path::PathBuf;
15use tokio_util::either::Either;
16
17pub mod engine_store;
18use engine_store::EngineStoreStream;
19
20pub mod skip_fcu;
21use skip_fcu::EngineSkipFcu;
22
23pub mod skip_new_payload;
24use skip_new_payload::EngineSkipNewPayload;
25
26pub mod reorg;
27use reorg::EngineReorg;
28
29type MaybeReorgResult<S, T, Provider, Evm, Validator, E> =
31 Result<Either<EngineReorg<S, T, Provider, Evm, Validator>, S>, E>;
32
33pub trait EngineMessageStreamExt<T: PayloadTypes>: Stream<Item = BeaconEngineMessage<T>> {
35 fn skip_fcu(self, count: usize) -> EngineSkipFcu<Self>
38 where
39 Self: Sized,
40 {
41 EngineSkipFcu::new(self, count)
42 }
43
44 fn maybe_skip_fcu(self, maybe_count: Option<usize>) -> Either<EngineSkipFcu<Self>, Self>
47 where
48 Self: Sized,
49 {
50 if let Some(count) = maybe_count {
51 Either::Left(self.skip_fcu(count))
52 } else {
53 Either::Right(self)
54 }
55 }
56
57 fn skip_new_payload(self, count: usize) -> EngineSkipNewPayload<Self>
60 where
61 Self: Sized,
62 {
63 EngineSkipNewPayload::new(self, count)
64 }
65
66 fn maybe_skip_new_payload(
69 self,
70 maybe_count: Option<usize>,
71 ) -> Either<EngineSkipNewPayload<Self>, Self>
72 where
73 Self: Sized,
74 {
75 if let Some(count) = maybe_count {
76 Either::Left(self.skip_new_payload(count))
77 } else {
78 Either::Right(self)
79 }
80 }
81
82 fn store_messages(self, path: PathBuf) -> EngineStoreStream<Self>
84 where
85 Self: Sized,
86 {
87 EngineStoreStream::new(self, path)
88 }
89
90 fn maybe_store_messages(
93 self,
94 maybe_path: Option<PathBuf>,
95 ) -> Either<EngineStoreStream<Self>, Self>
96 where
97 Self: Sized,
98 {
99 if let Some(path) = maybe_path {
100 Either::Left(self.store_messages(path))
101 } else {
102 Either::Right(self)
103 }
104 }
105
106 fn reorg<Provider, Evm, Validator>(
108 self,
109 provider: Provider,
110 evm_config: Evm,
111 payload_validator: Validator,
112 frequency: usize,
113 depth: Option<usize>,
114 ) -> EngineReorg<Self, T, Provider, Evm, Validator>
115 where
116 Self: Sized,
117 {
118 EngineReorg::new(
119 self,
120 provider,
121 evm_config,
122 payload_validator,
123 frequency,
124 depth.unwrap_or_default(),
125 )
126 }
127
128 fn maybe_reorg<Provider, Evm, Validator, E, F, Fut>(
134 self,
135 provider: Provider,
136 evm_config: Evm,
137 payload_validator_fn: F,
138 frequency: Option<usize>,
139 depth: Option<usize>,
140 ) -> impl Future<Output = MaybeReorgResult<Self, T, Provider, Evm, Validator, E>> + Send
141 where
142 Self: Sized + Send,
143 Provider: Send,
144 Evm: Send,
145 F: FnOnce() -> Fut + Send,
146 Fut: Future<Output = Result<Validator, E>> + Send,
147 {
148 async move {
149 if let Some(frequency) = frequency {
150 let validator = payload_validator_fn().await?;
151 Ok(Either::Left(reorg::EngineReorg::new(
152 self,
153 provider,
154 evm_config,
155 validator,
156 frequency,
157 depth.unwrap_or_default(),
158 )))
159 } else {
160 Ok(Either::Right(self))
161 }
162 }
163 }
164}
165
166impl<T, S> EngineMessageStreamExt<T> for S
167where
168 T: PayloadTypes,
169 S: Stream<Item = BeaconEngineMessage<T>>,
170{
171}