reth_engine_local/
service.rs

1//! Provides a local dev service engine that can be used to run a dev chain.
2//!
3//! [`LocalEngineService`] polls the payload builder based on a mining mode
4//! which can be set to `Instant` or `Interval`. The `Instant` mode will
5//! constantly poll the payload builder and initiate block building
6//! with a single transaction. The `Interval` mode will initiate block
7//! building at a fixed interval.
8
9use core::fmt;
10use std::{
11    fmt::{Debug, Formatter},
12    pin::Pin,
13    sync::Arc,
14    task::{Context, Poll},
15};
16
17use crate::miner::{LocalMiner, MiningMode};
18use futures_util::{Stream, StreamExt};
19use reth_chainspec::EthChainSpec;
20use reth_consensus::{ConsensusError, FullConsensus};
21use reth_engine_primitives::{BeaconConsensusEngineEvent, BeaconEngineMessage, EngineValidator};
22use reth_engine_service::service::EngineMessageStream;
23use reth_engine_tree::{
24    chain::{ChainEvent, HandlerEvent},
25    engine::{
26        EngineApiKind, EngineApiRequest, EngineApiRequestHandler, EngineRequestHandler, FromEngine,
27        RequestHandlerEvent,
28    },
29    persistence::PersistenceHandle,
30    tree::{EngineApiTreeHandler, InvalidBlockHook, TreeConfig},
31};
32use reth_evm::{execute::BlockExecutorProvider, ConfigureEvm};
33use reth_node_types::BlockTy;
34use reth_payload_builder::PayloadBuilderHandle;
35use reth_payload_primitives::{PayloadAttributesBuilder, PayloadTypes};
36use reth_provider::{
37    providers::{BlockchainProvider, EngineNodeTypes},
38    ChainSpecProvider, ProviderFactory,
39};
40use reth_prune::PrunerWithFactory;
41use reth_stages_api::MetricEventsSender;
42use tokio::sync::mpsc::UnboundedSender;
43use tracing::error;
44
45/// Provides a local dev service engine that can be used to drive the
46/// chain forward.
47///
48/// This service both produces and consumes [`BeaconEngineMessage`]s. This is done to allow
49/// modifications of the stream
50pub struct LocalEngineService<N>
51where
52    N: EngineNodeTypes,
53{
54    /// Processes requests.
55    ///
56    /// This type is responsible for processing incoming requests.
57    handler: EngineApiRequestHandler<EngineApiRequest<N::Engine, N::Primitives>, N::Primitives>,
58    /// Receiver for incoming requests (from the engine API endpoint) that need to be processed.
59    incoming_requests: EngineMessageStream<N::Engine>,
60}
61
62impl<N> LocalEngineService<N>
63where
64    N: EngineNodeTypes,
65{
66    /// Constructor for [`LocalEngineService`].
67    #[allow(clippy::too_many_arguments)]
68    pub fn new<B, V, C>(
69        consensus: Arc<dyn FullConsensus<N::Primitives, Error = ConsensusError>>,
70        executor_factory: impl BlockExecutorProvider<Primitives = N::Primitives>,
71        provider: ProviderFactory<N>,
72        blockchain_db: BlockchainProvider<N>,
73        pruner: PrunerWithFactory<ProviderFactory<N>>,
74        payload_builder: PayloadBuilderHandle<N::Engine>,
75        payload_validator: V,
76        tree_config: TreeConfig,
77        invalid_block_hook: Box<dyn InvalidBlockHook<N::Primitives>>,
78        sync_metrics_tx: MetricEventsSender,
79        to_engine: UnboundedSender<BeaconEngineMessage<N::Engine>>,
80        from_engine: EngineMessageStream<N::Engine>,
81        mode: MiningMode,
82        payload_attributes_builder: B,
83        evm_config: C,
84    ) -> Self
85    where
86        B: PayloadAttributesBuilder<<N::Engine as PayloadTypes>::PayloadAttributes>,
87        V: EngineValidator<N::Engine, Block = BlockTy<N>>,
88        C: ConfigureEvm<Primitives = N::Primitives> + 'static,
89    {
90        let chain_spec = provider.chain_spec();
91        let engine_kind =
92            if chain_spec.is_optimism() { EngineApiKind::OpStack } else { EngineApiKind::Ethereum };
93
94        let persistence_handle =
95            PersistenceHandle::<N::Primitives>::spawn_service(provider, pruner, sync_metrics_tx);
96        let canonical_in_memory_state = blockchain_db.canonical_in_memory_state();
97
98        let (to_tree_tx, from_tree) =
99            EngineApiTreeHandler::<N::Primitives, _, _, _, _, _>::spawn_new(
100                blockchain_db.clone(),
101                executor_factory,
102                consensus,
103                payload_validator,
104                persistence_handle,
105                payload_builder.clone(),
106                canonical_in_memory_state,
107                tree_config,
108                invalid_block_hook,
109                engine_kind,
110                evm_config,
111            );
112
113        let handler = EngineApiRequestHandler::new(to_tree_tx, from_tree);
114
115        LocalMiner::spawn_new(
116            blockchain_db,
117            payload_attributes_builder,
118            to_engine,
119            mode,
120            payload_builder,
121        );
122
123        Self { handler, incoming_requests: from_engine }
124    }
125}
126
127impl<N> Stream for LocalEngineService<N>
128where
129    N: EngineNodeTypes,
130{
131    type Item = ChainEvent<BeaconConsensusEngineEvent<N::Primitives>>;
132
133    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
134        let this = self.get_mut();
135
136        if let Poll::Ready(ev) = this.handler.poll(cx) {
137            return match ev {
138                RequestHandlerEvent::HandlerEvent(ev) => match ev {
139                    HandlerEvent::BackfillAction(_) => {
140                        error!(target: "engine::local", "received backfill request in local engine");
141                        Poll::Ready(Some(ChainEvent::FatalError))
142                    }
143                    HandlerEvent::Event(ev) => Poll::Ready(Some(ChainEvent::Handler(ev))),
144                    HandlerEvent::FatalError => Poll::Ready(Some(ChainEvent::FatalError)),
145                },
146                RequestHandlerEvent::Download(_) => {
147                    error!(target: "engine::local", "received download request in local engine");
148                    Poll::Ready(Some(ChainEvent::FatalError))
149                }
150            }
151        }
152
153        // forward incoming requests to the handler
154        while let Poll::Ready(Some(req)) = this.incoming_requests.poll_next_unpin(cx) {
155            this.handler.on_event(FromEngine::Request(req.into()));
156        }
157
158        Poll::Pending
159    }
160}
161
162impl<N: EngineNodeTypes> Debug for LocalEngineService<N> {
163    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
164        f.debug_struct("LocalEngineService").finish_non_exhaustive()
165    }
166}