reth_engine_service/
service.rs

1use futures::{Stream, StreamExt};
2use pin_project::pin_project;
3use reth_chainspec::EthChainSpec;
4use reth_consensus::{ConsensusError, FullConsensus};
5use reth_engine_primitives::{BeaconConsensusEngineEvent, BeaconEngineMessage, EngineValidator};
6use reth_engine_tree::{
7    backfill::PipelineSync,
8    download::BasicBlockDownloader,
9    engine::{EngineApiKind, EngineApiRequest, EngineApiRequestHandler, EngineHandler},
10    persistence::PersistenceHandle,
11    tree::{EngineApiTreeHandler, InvalidBlockHook, TreeConfig},
12};
13pub use reth_engine_tree::{
14    chain::{ChainEvent, ChainOrchestrator},
15    engine::EngineApiEvent,
16};
17use reth_ethereum_primitives::EthPrimitives;
18use reth_evm::{execute::BlockExecutorProvider, ConfigureEvm};
19use reth_network_p2p::BlockClient;
20use reth_node_types::{BlockTy, NodeTypes, NodeTypesWithEngine};
21use reth_payload_builder::PayloadBuilderHandle;
22use reth_provider::{
23    providers::{BlockchainProvider, EngineNodeTypes},
24    ProviderFactory,
25};
26use reth_prune::PrunerWithFactory;
27use reth_stages_api::{MetricEventsSender, Pipeline};
28use reth_tasks::TaskSpawner;
29use std::{
30    marker::PhantomData,
31    pin::Pin,
32    sync::Arc,
33    task::{Context, Poll},
34};
35
36/// Alias for consensus engine stream.
37pub type EngineMessageStream<T> = Pin<Box<dyn Stream<Item = BeaconEngineMessage<T>> + Send + Sync>>;
38
39/// Alias for chain orchestrator.
40type EngineServiceType<N, Client> = ChainOrchestrator<
41    EngineHandler<
42        EngineApiRequestHandler<
43            EngineApiRequest<<N as NodeTypesWithEngine>::Engine, <N as NodeTypes>::Primitives>,
44            <N as NodeTypes>::Primitives,
45        >,
46        EngineMessageStream<<N as NodeTypesWithEngine>::Engine>,
47        BasicBlockDownloader<Client, BlockTy<N>>,
48    >,
49    PipelineSync<N>,
50>;
51
52/// The type that drives the chain forward and communicates progress.
53#[pin_project]
54#[allow(missing_debug_implementations)]
55// TODO(mattsse): remove hidde once fixed : <https://github.com/rust-lang/rust/issues/135363>
56//  otherwise rustdoc fails to resolve the alias
57#[doc(hidden)]
58pub struct EngineService<N, Client, E>
59where
60    N: EngineNodeTypes,
61    Client: BlockClient<Block = BlockTy<N>> + 'static,
62    E: BlockExecutorProvider + 'static,
63{
64    orchestrator: EngineServiceType<N, Client>,
65    _marker: PhantomData<E>,
66}
67
68impl<N, Client, E> EngineService<N, Client, E>
69where
70    N: EngineNodeTypes,
71    Client: BlockClient<Block = BlockTy<N>> + 'static,
72    E: BlockExecutorProvider<Primitives = N::Primitives> + 'static,
73{
74    /// Constructor for `EngineService`.
75    #[allow(clippy::too_many_arguments)]
76    pub fn new<V, C>(
77        consensus: Arc<dyn FullConsensus<N::Primitives, Error = ConsensusError>>,
78        executor_factory: E,
79        chain_spec: Arc<N::ChainSpec>,
80        client: Client,
81        incoming_requests: EngineMessageStream<N::Engine>,
82        pipeline: Pipeline<N>,
83        pipeline_task_spawner: Box<dyn TaskSpawner>,
84        provider: ProviderFactory<N>,
85        blockchain_db: BlockchainProvider<N>,
86        pruner: PrunerWithFactory<ProviderFactory<N>>,
87        payload_builder: PayloadBuilderHandle<N::Engine>,
88        payload_validator: V,
89        tree_config: TreeConfig,
90        invalid_block_hook: Box<dyn InvalidBlockHook<N::Primitives>>,
91        sync_metrics_tx: MetricEventsSender,
92        evm_config: C,
93    ) -> Self
94    where
95        V: EngineValidator<N::Engine, Block = BlockTy<N>>,
96        C: ConfigureEvm<Primitives = N::Primitives> + 'static,
97    {
98        let engine_kind =
99            if chain_spec.is_optimism() { EngineApiKind::OpStack } else { EngineApiKind::Ethereum };
100
101        let downloader = BasicBlockDownloader::new(client, consensus.clone().as_consensus());
102
103        let persistence_handle =
104            PersistenceHandle::<EthPrimitives>::spawn_service(provider, pruner, sync_metrics_tx);
105
106        let canonical_in_memory_state = blockchain_db.canonical_in_memory_state();
107
108        let (to_tree_tx, from_tree) =
109            EngineApiTreeHandler::<N::Primitives, _, _, _, _, _>::spawn_new(
110                blockchain_db,
111                executor_factory,
112                consensus,
113                payload_validator,
114                persistence_handle,
115                payload_builder,
116                canonical_in_memory_state,
117                tree_config,
118                invalid_block_hook,
119                engine_kind,
120                evm_config,
121            );
122
123        let engine_handler = EngineApiRequestHandler::new(to_tree_tx, from_tree);
124        let handler = EngineHandler::new(engine_handler, downloader, incoming_requests);
125
126        let backfill_sync = PipelineSync::new(pipeline, pipeline_task_spawner);
127
128        Self {
129            orchestrator: ChainOrchestrator::new(handler, backfill_sync),
130            _marker: Default::default(),
131        }
132    }
133
134    /// Returns a mutable reference to the orchestrator.
135    pub fn orchestrator_mut(&mut self) -> &mut EngineServiceType<N, Client> {
136        &mut self.orchestrator
137    }
138}
139
140impl<N, Client, E> Stream for EngineService<N, Client, E>
141where
142    N: EngineNodeTypes,
143    Client: BlockClient<Block = BlockTy<N>> + 'static,
144    E: BlockExecutorProvider + 'static,
145{
146    type Item = ChainEvent<BeaconConsensusEngineEvent<N::Primitives>>;
147
148    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
149        let mut orchestrator = self.project().orchestrator;
150        StreamExt::poll_next_unpin(&mut orchestrator, cx)
151    }
152}
153
154/// Potential error returned by `EngineService`.
155#[derive(Debug, thiserror::Error)]
156#[error("Engine service error.")]
157pub struct EngineServiceError {}
158
159#[cfg(test)]
160mod tests {
161    use super::*;
162    use reth_chainspec::{ChainSpecBuilder, MAINNET};
163    use reth_engine_primitives::BeaconEngineMessage;
164    use reth_engine_tree::{test_utils::TestPipelineBuilder, tree::NoopInvalidBlockHook};
165    use reth_ethereum_consensus::EthBeaconConsensus;
166    use reth_ethereum_engine_primitives::EthEngineTypes;
167    use reth_evm_ethereum::{execute::EthExecutorProvider, EthEvmConfig};
168    use reth_exex_types::FinishedExExHeight;
169    use reth_network_p2p::test_utils::TestFullBlockClient;
170    use reth_node_ethereum::EthereumEngineValidator;
171    use reth_primitives_traits::SealedHeader;
172    use reth_provider::{
173        providers::BlockchainProvider, test_utils::create_test_provider_factory_with_chain_spec,
174    };
175    use reth_prune::Pruner;
176    use reth_tasks::TokioTaskExecutor;
177    use std::sync::Arc;
178    use tokio::sync::{mpsc::unbounded_channel, watch};
179    use tokio_stream::wrappers::UnboundedReceiverStream;
180
181    #[test]
182    fn eth_chain_orchestrator_build() {
183        let chain_spec = Arc::new(
184            ChainSpecBuilder::default()
185                .chain(MAINNET.chain)
186                .genesis(MAINNET.genesis.clone())
187                .paris_activated()
188                .build(),
189        );
190        let consensus = Arc::new(EthBeaconConsensus::new(chain_spec.clone()));
191
192        let client = TestFullBlockClient::default();
193
194        let (_tx, rx) = unbounded_channel::<BeaconEngineMessage<EthEngineTypes>>();
195        let incoming_requests = UnboundedReceiverStream::new(rx);
196
197        let pipeline = TestPipelineBuilder::new().build(chain_spec.clone());
198        let pipeline_task_spawner = Box::<TokioTaskExecutor>::default();
199        let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
200
201        let executor_factory = EthExecutorProvider::ethereum(chain_spec.clone());
202        let blockchain_db =
203            BlockchainProvider::with_latest(provider_factory.clone(), SealedHeader::default())
204                .unwrap();
205        let engine_payload_validator = EthereumEngineValidator::new(chain_spec.clone());
206        let (_tx, rx) = watch::channel(FinishedExExHeight::NoExExs);
207        let pruner = Pruner::new_with_factory(provider_factory.clone(), vec![], 0, 0, None, rx);
208        let evm_config = EthEvmConfig::new(chain_spec.clone());
209
210        let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel();
211        let (tx, _rx) = unbounded_channel();
212        let _eth_service = EngineService::new(
213            consensus,
214            executor_factory,
215            chain_spec,
216            client,
217            Box::pin(incoming_requests),
218            pipeline,
219            pipeline_task_spawner,
220            provider_factory,
221            blockchain_db,
222            pruner,
223            PayloadBuilderHandle::new(tx),
224            engine_payload_validator,
225            TreeConfig::default(),
226            Box::new(NoopInvalidBlockHook::default()),
227            sync_metrics_tx,
228            evm_config,
229        );
230    }
231}