1use futures::{Stream, StreamExt};
2use pin_project::pin_project;
3use reth_chainspec::EthChainSpec;
4use reth_consensus::FullConsensus;
5use reth_engine_primitives::{BeaconEngineMessage, ConsensusEngineEvent};
6use reth_engine_tree::{
7 backfill::PipelineSync,
8 download::BasicBlockDownloader,
9 engine::{EngineApiKind, EngineApiRequest, EngineApiRequestHandler, EngineHandler},
10 persistence::PersistenceHandle,
11 tree::{EngineApiTreeHandler, EngineValidator, TreeConfig},
12};
13pub use reth_engine_tree::{
14 chain::{ChainEvent, ChainOrchestrator},
15 engine::EngineApiEvent,
16};
17use reth_ethereum_primitives::EthPrimitives;
18use reth_evm::ConfigureEvm;
19use reth_network_p2p::BlockClient;
20use reth_node_types::{BlockTy, NodeTypes};
21use reth_payload_builder::PayloadBuilderHandle;
22use reth_provider::{
23 providers::{BlockchainProvider, ProviderNodeTypes},
24 ProviderFactory,
25};
26use reth_prune::PrunerWithFactory;
27use reth_stages_api::{MetricEventsSender, Pipeline};
28use reth_tasks::TaskSpawner;
29use reth_trie_db::ChangesetCache;
30use std::{
31 pin::Pin,
32 sync::Arc,
33 task::{Context, Poll},
34};
35
36pub type EngineMessageStream<T> = Pin<Box<dyn Stream<Item = BeaconEngineMessage<T>> + Send + Sync>>;
38
39type EngineServiceType<N, Client> = ChainOrchestrator<
41 EngineHandler<
42 EngineApiRequestHandler<
43 EngineApiRequest<<N as NodeTypes>::Payload, <N as NodeTypes>::Primitives>,
44 <N as NodeTypes>::Primitives,
45 >,
46 EngineMessageStream<<N as NodeTypes>::Payload>,
47 BasicBlockDownloader<Client, BlockTy<N>>,
48 >,
49 PipelineSync<N>,
50>;
51
52#[pin_project]
54#[expect(missing_debug_implementations)]
55#[doc(hidden)]
58pub struct EngineService<N, Client>
59where
60 N: ProviderNodeTypes,
61 Client: BlockClient<Block = BlockTy<N>> + 'static,
62{
63 orchestrator: EngineServiceType<N, Client>,
64}
65
66impl<N, Client> EngineService<N, Client>
67where
68 N: ProviderNodeTypes,
69 Client: BlockClient<Block = BlockTy<N>> + 'static,
70{
71 #[expect(clippy::too_many_arguments)]
73 pub fn new<V, C>(
74 consensus: Arc<dyn FullConsensus<N::Primitives>>,
75 chain_spec: Arc<N::ChainSpec>,
76 client: Client,
77 incoming_requests: EngineMessageStream<N::Payload>,
78 pipeline: Pipeline<N>,
79 pipeline_task_spawner: Box<dyn TaskSpawner>,
80 provider: ProviderFactory<N>,
81 blockchain_db: BlockchainProvider<N>,
82 pruner: PrunerWithFactory<ProviderFactory<N>>,
83 payload_builder: PayloadBuilderHandle<N::Payload>,
84 payload_validator: V,
85 tree_config: TreeConfig,
86 sync_metrics_tx: MetricEventsSender,
87 evm_config: C,
88 changeset_cache: ChangesetCache,
89 ) -> Self
90 where
91 V: EngineValidator<N::Payload>,
92 C: ConfigureEvm<Primitives = N::Primitives> + 'static,
93 {
94 let engine_kind =
95 if chain_spec.is_optimism() { EngineApiKind::OpStack } else { EngineApiKind::Ethereum };
96
97 let downloader = BasicBlockDownloader::new(client, consensus.clone());
98
99 let persistence_handle =
100 PersistenceHandle::<EthPrimitives>::spawn_service(provider, pruner, sync_metrics_tx);
101
102 let canonical_in_memory_state = blockchain_db.canonical_in_memory_state();
103
104 let (to_tree_tx, from_tree) = EngineApiTreeHandler::<N::Primitives, _, _, _, _>::spawn_new(
105 blockchain_db,
106 consensus,
107 payload_validator,
108 persistence_handle,
109 payload_builder,
110 canonical_in_memory_state,
111 tree_config,
112 engine_kind,
113 evm_config,
114 changeset_cache,
115 );
116
117 let engine_handler = EngineApiRequestHandler::new(to_tree_tx, from_tree);
118 let handler = EngineHandler::new(engine_handler, downloader, incoming_requests);
119
120 let backfill_sync = PipelineSync::new(pipeline, pipeline_task_spawner);
121
122 Self { orchestrator: ChainOrchestrator::new(handler, backfill_sync) }
123 }
124
125 pub fn orchestrator_mut(&mut self) -> &mut EngineServiceType<N, Client> {
127 &mut self.orchestrator
128 }
129}
130
131impl<N, Client> Stream for EngineService<N, Client>
132where
133 N: ProviderNodeTypes,
134 Client: BlockClient<Block = BlockTy<N>> + 'static,
135{
136 type Item = ChainEvent<ConsensusEngineEvent<N::Primitives>>;
137
138 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
139 let mut orchestrator = self.project().orchestrator;
140 StreamExt::poll_next_unpin(&mut orchestrator, cx)
141 }
142}
143
144#[cfg(test)]
145mod tests {
146 use super::*;
147 use reth_chainspec::{ChainSpecBuilder, MAINNET};
148 use reth_engine_primitives::{BeaconEngineMessage, NoopInvalidBlockHook};
149 use reth_engine_tree::{test_utils::TestPipelineBuilder, tree::BasicEngineValidator};
150 use reth_ethereum_consensus::EthBeaconConsensus;
151 use reth_ethereum_engine_primitives::EthEngineTypes;
152 use reth_evm_ethereum::EthEvmConfig;
153 use reth_exex_types::FinishedExExHeight;
154 use reth_network_p2p::test_utils::TestFullBlockClient;
155 use reth_node_ethereum::EthereumEngineValidator;
156 use reth_primitives_traits::SealedHeader;
157 use reth_provider::{
158 providers::BlockchainProvider, test_utils::create_test_provider_factory_with_chain_spec,
159 };
160 use reth_prune::Pruner;
161 use reth_tasks::TokioTaskExecutor;
162 use reth_trie_db::ChangesetCache;
163 use std::sync::Arc;
164 use tokio::sync::{mpsc::unbounded_channel, watch};
165 use tokio_stream::wrappers::UnboundedReceiverStream;
166
167 #[test]
168 fn eth_chain_orchestrator_build() {
169 let chain_spec = Arc::new(
170 ChainSpecBuilder::default()
171 .chain(MAINNET.chain)
172 .genesis(MAINNET.genesis.clone())
173 .paris_activated()
174 .build(),
175 );
176 let consensus = Arc::new(EthBeaconConsensus::new(chain_spec.clone()));
177
178 let client = TestFullBlockClient::default();
179
180 let (_tx, rx) = unbounded_channel::<BeaconEngineMessage<EthEngineTypes>>();
181 let incoming_requests = UnboundedReceiverStream::new(rx);
182
183 let pipeline = TestPipelineBuilder::new().build(chain_spec.clone());
184 let pipeline_task_spawner = Box::<TokioTaskExecutor>::default();
185 let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
186
187 let blockchain_db =
188 BlockchainProvider::with_latest(provider_factory.clone(), SealedHeader::default())
189 .unwrap();
190 let engine_payload_validator = EthereumEngineValidator::new(chain_spec.clone());
191 let (_tx, rx) = watch::channel(FinishedExExHeight::NoExExs);
192 let pruner = Pruner::new_with_factory(provider_factory.clone(), vec![], 0, 0, None, rx);
193 let evm_config = EthEvmConfig::new(chain_spec.clone());
194
195 let changeset_cache = ChangesetCache::new();
196
197 let engine_validator = BasicEngineValidator::new(
198 blockchain_db.clone(),
199 consensus.clone(),
200 evm_config.clone(),
201 engine_payload_validator,
202 TreeConfig::default(),
203 Box::new(NoopInvalidBlockHook::default()),
204 changeset_cache.clone(),
205 );
206
207 let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel();
208 let (tx, _rx) = unbounded_channel();
209 let _eth_service = EngineService::new(
210 consensus,
211 chain_spec,
212 client,
213 Box::pin(incoming_requests),
214 pipeline,
215 pipeline_task_spawner,
216 provider_factory,
217 blockchain_db,
218 pruner,
219 PayloadBuilderHandle::new(tx),
220 engine_validator,
221 TreeConfig::default(),
222 sync_metrics_tx,
223 evm_config,
224 changeset_cache,
225 );
226 }
227}