use futures::Stream;
use reth_beacon_consensus::BeaconEngineMessage;
use reth_engine_primitives::EngineTypes;
use reth_payload_validator::ExecutionPayloadValidator;
use std::path::PathBuf;
use tokio_util::either::Either;
pub mod engine_store;
use engine_store::EngineStoreStream;
pub mod skip_fcu;
use skip_fcu::EngineSkipFcu;
pub mod skip_new_payload;
use skip_new_payload::EngineSkipNewPayload;
pub mod reorg;
use reorg::EngineReorg;
pub trait EngineMessageStreamExt<Engine: EngineTypes>:
Stream<Item = BeaconEngineMessage<Engine>>
{
fn skip_fcu(self, count: usize) -> EngineSkipFcu<Self>
where
Self: Sized,
{
EngineSkipFcu::new(self, count)
}
fn maybe_skip_fcu(self, maybe_count: Option<usize>) -> Either<EngineSkipFcu<Self>, Self>
where
Self: Sized,
{
if let Some(count) = maybe_count {
Either::Left(self.skip_fcu(count))
} else {
Either::Right(self)
}
}
fn skip_new_payload(self, count: usize) -> EngineSkipNewPayload<Self>
where
Self: Sized,
{
EngineSkipNewPayload::new(self, count)
}
fn maybe_skip_new_payload(
self,
maybe_count: Option<usize>,
) -> Either<EngineSkipNewPayload<Self>, Self>
where
Self: Sized,
{
if let Some(count) = maybe_count {
Either::Left(self.skip_new_payload(count))
} else {
Either::Right(self)
}
}
fn store_messages(self, path: PathBuf) -> EngineStoreStream<Self>
where
Self: Sized,
{
EngineStoreStream::new(self, path)
}
fn maybe_store_messages(
self,
maybe_path: Option<PathBuf>,
) -> Either<EngineStoreStream<Self>, Self>
where
Self: Sized,
{
if let Some(path) = maybe_path {
Either::Left(self.store_messages(path))
} else {
Either::Right(self)
}
}
fn reorg<Provider, Evm, Spec>(
self,
provider: Provider,
evm_config: Evm,
payload_validator: ExecutionPayloadValidator<Spec>,
frequency: usize,
depth: Option<usize>,
) -> EngineReorg<Self, Engine, Provider, Evm, Spec>
where
Self: Sized,
{
EngineReorg::new(
self,
provider,
evm_config,
payload_validator,
frequency,
depth.unwrap_or_default(),
)
}
fn maybe_reorg<Provider, Evm, Spec>(
self,
provider: Provider,
evm_config: Evm,
payload_validator: ExecutionPayloadValidator<Spec>,
frequency: Option<usize>,
depth: Option<usize>,
) -> Either<EngineReorg<Self, Engine, Provider, Evm, Spec>, Self>
where
Self: Sized,
{
if let Some(frequency) = frequency {
Either::Left(reorg::EngineReorg::new(
self,
provider,
evm_config,
payload_validator,
frequency,
depth.unwrap_or_default(),
))
} else {
Either::Right(self)
}
}
}
impl<Engine, T> EngineMessageStreamExt<Engine> for T
where
Engine: EngineTypes,
T: Stream<Item = BeaconEngineMessage<Engine>>,
{
}