1use futures::{Stream, StreamExt};
2use pin_project::pin_project;
3use reth_chainspec::EthChainSpec;
4use reth_consensus::{ConsensusError, FullConsensus};
5use reth_engine_primitives::{BeaconConsensusEngineEvent, BeaconEngineMessage, EngineValidator};
6use reth_engine_tree::{
7 backfill::PipelineSync,
8 download::BasicBlockDownloader,
9 engine::{EngineApiKind, EngineApiRequest, EngineApiRequestHandler, EngineHandler},
10 persistence::PersistenceHandle,
11 tree::{EngineApiTreeHandler, InvalidBlockHook, TreeConfig},
12};
13pub use reth_engine_tree::{
14 chain::{ChainEvent, ChainOrchestrator},
15 engine::EngineApiEvent,
16};
17use reth_ethereum_primitives::EthPrimitives;
18use reth_evm::{execute::BlockExecutorProvider, ConfigureEvm};
19use reth_network_p2p::BlockClient;
20use reth_node_types::{BlockTy, NodeTypes, NodeTypesWithEngine};
21use reth_payload_builder::PayloadBuilderHandle;
22use reth_provider::{
23 providers::{BlockchainProvider, EngineNodeTypes},
24 ProviderFactory,
25};
26use reth_prune::PrunerWithFactory;
27use reth_stages_api::{MetricEventsSender, Pipeline};
28use reth_tasks::TaskSpawner;
29use std::{
30 marker::PhantomData,
31 pin::Pin,
32 sync::Arc,
33 task::{Context, Poll},
34};
35
36pub type EngineMessageStream<T> = Pin<Box<dyn Stream<Item = BeaconEngineMessage<T>> + Send + Sync>>;
38
39type EngineServiceType<N, Client> = ChainOrchestrator<
41 EngineHandler<
42 EngineApiRequestHandler<
43 EngineApiRequest<<N as NodeTypesWithEngine>::Engine, <N as NodeTypes>::Primitives>,
44 <N as NodeTypes>::Primitives,
45 >,
46 EngineMessageStream<<N as NodeTypesWithEngine>::Engine>,
47 BasicBlockDownloader<Client, BlockTy<N>>,
48 >,
49 PipelineSync<N>,
50>;
51
52#[pin_project]
54#[allow(missing_debug_implementations)]
55#[doc(hidden)]
58pub struct EngineService<N, Client, E>
59where
60 N: EngineNodeTypes,
61 Client: BlockClient<Block = BlockTy<N>> + 'static,
62 E: BlockExecutorProvider + 'static,
63{
64 orchestrator: EngineServiceType<N, Client>,
65 _marker: PhantomData<E>,
66}
67
68impl<N, Client, E> EngineService<N, Client, E>
69where
70 N: EngineNodeTypes,
71 Client: BlockClient<Block = BlockTy<N>> + 'static,
72 E: BlockExecutorProvider<Primitives = N::Primitives> + 'static,
73{
74 #[allow(clippy::too_many_arguments)]
76 pub fn new<V, C>(
77 consensus: Arc<dyn FullConsensus<N::Primitives, Error = ConsensusError>>,
78 executor_factory: E,
79 chain_spec: Arc<N::ChainSpec>,
80 client: Client,
81 incoming_requests: EngineMessageStream<N::Engine>,
82 pipeline: Pipeline<N>,
83 pipeline_task_spawner: Box<dyn TaskSpawner>,
84 provider: ProviderFactory<N>,
85 blockchain_db: BlockchainProvider<N>,
86 pruner: PrunerWithFactory<ProviderFactory<N>>,
87 payload_builder: PayloadBuilderHandle<N::Engine>,
88 payload_validator: V,
89 tree_config: TreeConfig,
90 invalid_block_hook: Box<dyn InvalidBlockHook<N::Primitives>>,
91 sync_metrics_tx: MetricEventsSender,
92 evm_config: C,
93 ) -> Self
94 where
95 V: EngineValidator<N::Engine, Block = BlockTy<N>>,
96 C: ConfigureEvm<Primitives = N::Primitives> + 'static,
97 {
98 let engine_kind =
99 if chain_spec.is_optimism() { EngineApiKind::OpStack } else { EngineApiKind::Ethereum };
100
101 let downloader = BasicBlockDownloader::new(client, consensus.clone().as_consensus());
102
103 let persistence_handle =
104 PersistenceHandle::<EthPrimitives>::spawn_service(provider, pruner, sync_metrics_tx);
105
106 let canonical_in_memory_state = blockchain_db.canonical_in_memory_state();
107
108 let (to_tree_tx, from_tree) =
109 EngineApiTreeHandler::<N::Primitives, _, _, _, _, _>::spawn_new(
110 blockchain_db,
111 executor_factory,
112 consensus,
113 payload_validator,
114 persistence_handle,
115 payload_builder,
116 canonical_in_memory_state,
117 tree_config,
118 invalid_block_hook,
119 engine_kind,
120 evm_config,
121 );
122
123 let engine_handler = EngineApiRequestHandler::new(to_tree_tx, from_tree);
124 let handler = EngineHandler::new(engine_handler, downloader, incoming_requests);
125
126 let backfill_sync = PipelineSync::new(pipeline, pipeline_task_spawner);
127
128 Self {
129 orchestrator: ChainOrchestrator::new(handler, backfill_sync),
130 _marker: Default::default(),
131 }
132 }
133
134 pub fn orchestrator_mut(&mut self) -> &mut EngineServiceType<N, Client> {
136 &mut self.orchestrator
137 }
138}
139
140impl<N, Client, E> Stream for EngineService<N, Client, E>
141where
142 N: EngineNodeTypes,
143 Client: BlockClient<Block = BlockTy<N>> + 'static,
144 E: BlockExecutorProvider + 'static,
145{
146 type Item = ChainEvent<BeaconConsensusEngineEvent<N::Primitives>>;
147
148 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
149 let mut orchestrator = self.project().orchestrator;
150 StreamExt::poll_next_unpin(&mut orchestrator, cx)
151 }
152}
153
154#[derive(Debug, thiserror::Error)]
156#[error("Engine service error.")]
157pub struct EngineServiceError {}
158
159#[cfg(test)]
160mod tests {
161 use super::*;
162 use reth_chainspec::{ChainSpecBuilder, MAINNET};
163 use reth_engine_primitives::BeaconEngineMessage;
164 use reth_engine_tree::{test_utils::TestPipelineBuilder, tree::NoopInvalidBlockHook};
165 use reth_ethereum_consensus::EthBeaconConsensus;
166 use reth_ethereum_engine_primitives::EthEngineTypes;
167 use reth_evm_ethereum::{execute::EthExecutorProvider, EthEvmConfig};
168 use reth_exex_types::FinishedExExHeight;
169 use reth_network_p2p::test_utils::TestFullBlockClient;
170 use reth_node_ethereum::EthereumEngineValidator;
171 use reth_primitives_traits::SealedHeader;
172 use reth_provider::{
173 providers::BlockchainProvider, test_utils::create_test_provider_factory_with_chain_spec,
174 };
175 use reth_prune::Pruner;
176 use reth_tasks::TokioTaskExecutor;
177 use std::sync::Arc;
178 use tokio::sync::{mpsc::unbounded_channel, watch};
179 use tokio_stream::wrappers::UnboundedReceiverStream;
180
181 #[test]
182 fn eth_chain_orchestrator_build() {
183 let chain_spec = Arc::new(
184 ChainSpecBuilder::default()
185 .chain(MAINNET.chain)
186 .genesis(MAINNET.genesis.clone())
187 .paris_activated()
188 .build(),
189 );
190 let consensus = Arc::new(EthBeaconConsensus::new(chain_spec.clone()));
191
192 let client = TestFullBlockClient::default();
193
194 let (_tx, rx) = unbounded_channel::<BeaconEngineMessage<EthEngineTypes>>();
195 let incoming_requests = UnboundedReceiverStream::new(rx);
196
197 let pipeline = TestPipelineBuilder::new().build(chain_spec.clone());
198 let pipeline_task_spawner = Box::<TokioTaskExecutor>::default();
199 let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
200
201 let executor_factory = EthExecutorProvider::ethereum(chain_spec.clone());
202 let blockchain_db =
203 BlockchainProvider::with_latest(provider_factory.clone(), SealedHeader::default())
204 .unwrap();
205 let engine_payload_validator = EthereumEngineValidator::new(chain_spec.clone());
206 let (_tx, rx) = watch::channel(FinishedExExHeight::NoExExs);
207 let pruner = Pruner::new_with_factory(provider_factory.clone(), vec![], 0, 0, None, rx);
208 let evm_config = EthEvmConfig::new(chain_spec.clone());
209
210 let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel();
211 let (tx, _rx) = unbounded_channel();
212 let _eth_service = EngineService::new(
213 consensus,
214 executor_factory,
215 chain_spec,
216 client,
217 Box::pin(incoming_requests),
218 pipeline,
219 pipeline_task_spawner,
220 provider_factory,
221 blockchain_db,
222 pruner,
223 PayloadBuilderHandle::new(tx),
224 engine_payload_validator,
225 TreeConfig::default(),
226 Box::new(NoopInvalidBlockHook::default()),
227 sync_metrics_tx,
228 evm_config,
229 );
230 }
231}