Skip to main content

reth_engine_service/
service.rs

1use futures::{Stream, StreamExt};
2use pin_project::pin_project;
3use reth_chainspec::EthChainSpec;
4use reth_consensus::FullConsensus;
5use reth_engine_primitives::{BeaconEngineMessage, ConsensusEngineEvent};
6use reth_engine_tree::{
7    backfill::PipelineSync,
8    download::BasicBlockDownloader,
9    engine::{EngineApiKind, EngineApiRequest, EngineApiRequestHandler, EngineHandler},
10    persistence::PersistenceHandle,
11    tree::{EngineApiTreeHandler, EngineValidator, TreeConfig},
12};
13pub use reth_engine_tree::{
14    chain::{ChainEvent, ChainOrchestrator},
15    engine::EngineApiEvent,
16};
17use reth_evm::ConfigureEvm;
18use reth_network_p2p::BlockClient;
19use reth_node_types::{BlockTy, NodeTypes};
20use reth_payload_builder::PayloadBuilderHandle;
21use reth_provider::{
22    providers::{BlockchainProvider, ProviderNodeTypes},
23    ProviderFactory, StorageSettingsCache,
24};
25use reth_prune::PrunerWithFactory;
26use reth_stages_api::{MetricEventsSender, Pipeline};
27use reth_tasks::TaskSpawner;
28use reth_trie_db::ChangesetCache;
29use std::{
30    pin::Pin,
31    sync::Arc,
32    task::{Context, Poll},
33};
34
35/// Alias for consensus engine stream.
36pub type EngineMessageStream<T> = Pin<Box<dyn Stream<Item = BeaconEngineMessage<T>> + Send + Sync>>;
37
38/// Alias for chain orchestrator.
39type EngineServiceType<N, Client> = ChainOrchestrator<
40    EngineHandler<
41        EngineApiRequestHandler<
42            EngineApiRequest<<N as NodeTypes>::Payload, <N as NodeTypes>::Primitives>,
43            <N as NodeTypes>::Primitives,
44        >,
45        EngineMessageStream<<N as NodeTypes>::Payload>,
46        BasicBlockDownloader<Client, BlockTy<N>>,
47    >,
48    PipelineSync<N>,
49>;
50
51/// The type that drives the chain forward and communicates progress.
52#[pin_project]
53#[expect(missing_debug_implementations)]
54// TODO(mattsse): remove hidden once fixed : <https://github.com/rust-lang/rust/issues/135363>
55//  otherwise rustdoc fails to resolve the alias
56#[doc(hidden)]
57pub struct EngineService<N, Client>
58where
59    N: ProviderNodeTypes,
60    Client: BlockClient<Block = BlockTy<N>> + 'static,
61{
62    orchestrator: EngineServiceType<N, Client>,
63}
64
65impl<N, Client> EngineService<N, Client>
66where
67    N: ProviderNodeTypes,
68    Client: BlockClient<Block = BlockTy<N>> + 'static,
69{
70    /// Constructor for `EngineService`.
71    #[expect(clippy::too_many_arguments)]
72    pub fn new<V, C>(
73        consensus: Arc<dyn FullConsensus<N::Primitives>>,
74        chain_spec: Arc<N::ChainSpec>,
75        client: Client,
76        incoming_requests: EngineMessageStream<N::Payload>,
77        pipeline: Pipeline<N>,
78        pipeline_task_spawner: Box<dyn TaskSpawner>,
79        provider: ProviderFactory<N>,
80        blockchain_db: BlockchainProvider<N>,
81        pruner: PrunerWithFactory<ProviderFactory<N>>,
82        payload_builder: PayloadBuilderHandle<N::Payload>,
83        payload_validator: V,
84        tree_config: TreeConfig,
85        sync_metrics_tx: MetricEventsSender,
86        evm_config: C,
87        changeset_cache: ChangesetCache,
88    ) -> Self
89    where
90        V: EngineValidator<N::Payload>,
91        C: ConfigureEvm<Primitives = N::Primitives> + 'static,
92    {
93        let engine_kind =
94            if chain_spec.is_optimism() { EngineApiKind::OpStack } else { EngineApiKind::Ethereum };
95
96        let downloader = BasicBlockDownloader::new(client, consensus.clone());
97        let use_hashed_state = provider.cached_storage_settings().use_hashed_state();
98
99        let persistence_handle =
100            PersistenceHandle::<N::Primitives>::spawn_service(provider, pruner, sync_metrics_tx);
101
102        let canonical_in_memory_state = blockchain_db.canonical_in_memory_state();
103
104        let (to_tree_tx, from_tree) = EngineApiTreeHandler::spawn_new(
105            blockchain_db,
106            consensus,
107            payload_validator,
108            persistence_handle,
109            payload_builder,
110            canonical_in_memory_state,
111            tree_config,
112            engine_kind,
113            evm_config,
114            changeset_cache,
115            use_hashed_state,
116        );
117
118        let engine_handler = EngineApiRequestHandler::new(to_tree_tx, from_tree);
119        let handler = EngineHandler::new(engine_handler, downloader, incoming_requests);
120
121        let backfill_sync = PipelineSync::new(pipeline, pipeline_task_spawner);
122
123        Self { orchestrator: ChainOrchestrator::new(handler, backfill_sync) }
124    }
125
126    /// Returns a mutable reference to the orchestrator.
127    pub fn orchestrator_mut(&mut self) -> &mut EngineServiceType<N, Client> {
128        &mut self.orchestrator
129    }
130}
131
132impl<N, Client> Stream for EngineService<N, Client>
133where
134    N: ProviderNodeTypes,
135    Client: BlockClient<Block = BlockTy<N>> + 'static,
136{
137    type Item = ChainEvent<ConsensusEngineEvent<N::Primitives>>;
138
139    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
140        let mut orchestrator = self.project().orchestrator;
141        StreamExt::poll_next_unpin(&mut orchestrator, cx)
142    }
143}
144
145#[cfg(test)]
146mod tests {
147    use super::*;
148    use reth_chainspec::{ChainSpecBuilder, MAINNET};
149    use reth_engine_primitives::{BeaconEngineMessage, NoopInvalidBlockHook};
150    use reth_engine_tree::{test_utils::TestPipelineBuilder, tree::BasicEngineValidator};
151    use reth_ethereum_consensus::EthBeaconConsensus;
152    use reth_ethereum_engine_primitives::EthEngineTypes;
153    use reth_evm_ethereum::EthEvmConfig;
154    use reth_exex_types::FinishedExExHeight;
155    use reth_network_p2p::test_utils::TestFullBlockClient;
156    use reth_node_ethereum::EthereumEngineValidator;
157    use reth_primitives_traits::SealedHeader;
158    use reth_provider::{
159        providers::BlockchainProvider, test_utils::create_test_provider_factory_with_chain_spec,
160    };
161    use reth_prune::Pruner;
162    use reth_tasks::TokioTaskExecutor;
163    use reth_trie_db::ChangesetCache;
164    use std::sync::Arc;
165    use tokio::sync::{mpsc::unbounded_channel, watch};
166    use tokio_stream::wrappers::UnboundedReceiverStream;
167
168    #[test]
169    fn eth_chain_orchestrator_build() {
170        let chain_spec = Arc::new(
171            ChainSpecBuilder::default()
172                .chain(MAINNET.chain)
173                .genesis(MAINNET.genesis.clone())
174                .paris_activated()
175                .build(),
176        );
177        let consensus = Arc::new(EthBeaconConsensus::new(chain_spec.clone()));
178
179        let client = TestFullBlockClient::default();
180
181        let (_tx, rx) = unbounded_channel::<BeaconEngineMessage<EthEngineTypes>>();
182        let incoming_requests = UnboundedReceiverStream::new(rx);
183
184        let pipeline = TestPipelineBuilder::new().build(chain_spec.clone());
185        let pipeline_task_spawner = Box::<TokioTaskExecutor>::default();
186        let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
187
188        let blockchain_db =
189            BlockchainProvider::with_latest(provider_factory.clone(), SealedHeader::default())
190                .unwrap();
191        let engine_payload_validator = EthereumEngineValidator::new(chain_spec.clone());
192        let (_tx, rx) = watch::channel(FinishedExExHeight::NoExExs);
193        let pruner = Pruner::new_with_factory(provider_factory.clone(), vec![], 0, 0, None, rx);
194        let evm_config = EthEvmConfig::new(chain_spec.clone());
195
196        let changeset_cache = ChangesetCache::new();
197
198        let engine_validator = BasicEngineValidator::new(
199            blockchain_db.clone(),
200            consensus.clone(),
201            evm_config.clone(),
202            engine_payload_validator,
203            TreeConfig::default(),
204            Box::new(NoopInvalidBlockHook::default()),
205            changeset_cache.clone(),
206            reth_tasks::Runtime::test(),
207        );
208
209        let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel();
210        let (tx, _rx) = unbounded_channel();
211        let _eth_service = EngineService::new(
212            consensus,
213            chain_spec,
214            client,
215            Box::pin(incoming_requests),
216            pipeline,
217            pipeline_task_spawner,
218            provider_factory,
219            blockchain_db,
220            pruner,
221            PayloadBuilderHandle::new(tx),
222            engine_validator,
223            TreeConfig::default(),
224            sync_metrics_tx,
225            evm_config,
226            changeset_cache,
227        );
228    }
229}