reth_engine_util/
lib.rs
1#![doc(
4 html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
5 html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
6 issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
7)]
8#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
9#![cfg_attr(not(test), warn(unused_crate_dependencies))]
10
11use futures::Stream;
12use reth_engine_primitives::BeaconEngineMessage;
13use reth_payload_primitives::PayloadTypes;
14use std::path::PathBuf;
15use tokio_util::either::Either;
16
17pub mod engine_store;
18use engine_store::EngineStoreStream;
19
20pub mod skip_fcu;
21use skip_fcu::EngineSkipFcu;
22
23pub mod skip_new_payload;
24use skip_new_payload::EngineSkipNewPayload;
25
26pub mod reorg;
27use reorg::EngineReorg;
28
29pub trait EngineMessageStreamExt<T: PayloadTypes>: Stream<Item = BeaconEngineMessage<T>> {
31 fn skip_fcu(self, count: usize) -> EngineSkipFcu<Self>
34 where
35 Self: Sized,
36 {
37 EngineSkipFcu::new(self, count)
38 }
39
40 fn maybe_skip_fcu(self, maybe_count: Option<usize>) -> Either<EngineSkipFcu<Self>, Self>
43 where
44 Self: Sized,
45 {
46 if let Some(count) = maybe_count {
47 Either::Left(self.skip_fcu(count))
48 } else {
49 Either::Right(self)
50 }
51 }
52
53 fn skip_new_payload(self, count: usize) -> EngineSkipNewPayload<Self>
56 where
57 Self: Sized,
58 {
59 EngineSkipNewPayload::new(self, count)
60 }
61
62 fn maybe_skip_new_payload(
65 self,
66 maybe_count: Option<usize>,
67 ) -> Either<EngineSkipNewPayload<Self>, Self>
68 where
69 Self: Sized,
70 {
71 if let Some(count) = maybe_count {
72 Either::Left(self.skip_new_payload(count))
73 } else {
74 Either::Right(self)
75 }
76 }
77
78 fn store_messages(self, path: PathBuf) -> EngineStoreStream<Self>
80 where
81 Self: Sized,
82 {
83 EngineStoreStream::new(self, path)
84 }
85
86 fn maybe_store_messages(
89 self,
90 maybe_path: Option<PathBuf>,
91 ) -> Either<EngineStoreStream<Self>, Self>
92 where
93 Self: Sized,
94 {
95 if let Some(path) = maybe_path {
96 Either::Left(self.store_messages(path))
97 } else {
98 Either::Right(self)
99 }
100 }
101
102 fn reorg<Provider, Evm, Validator>(
104 self,
105 provider: Provider,
106 evm_config: Evm,
107 payload_validator: Validator,
108 frequency: usize,
109 depth: Option<usize>,
110 ) -> EngineReorg<Self, T, Provider, Evm, Validator>
111 where
112 Self: Sized,
113 {
114 EngineReorg::new(
115 self,
116 provider,
117 evm_config,
118 payload_validator,
119 frequency,
120 depth.unwrap_or_default(),
121 )
122 }
123
124 fn maybe_reorg<Provider, Evm, Validator>(
127 self,
128 provider: Provider,
129 evm_config: Evm,
130 payload_validator: Validator,
131 frequency: Option<usize>,
132 depth: Option<usize>,
133 ) -> Either<EngineReorg<Self, T, Provider, Evm, Validator>, Self>
134 where
135 Self: Sized,
136 {
137 if let Some(frequency) = frequency {
138 Either::Left(reorg::EngineReorg::new(
139 self,
140 provider,
141 evm_config,
142 payload_validator,
143 frequency,
144 depth.unwrap_or_default(),
145 ))
146 } else {
147 Either::Right(self)
148 }
149 }
150}
151
152impl<T, S> EngineMessageStreamExt<T> for S
153where
154 T: PayloadTypes,
155 S: Stream<Item = BeaconEngineMessage<T>>,
156{
157}