reth_engine_tree/
engine.rs

1//! An engine API handler for the chain.
2
3use crate::{
4    backfill::BackfillAction,
5    chain::{ChainHandler, FromOrchestrator, HandlerEvent},
6    download::{BlockDownloader, DownloadAction, DownloadOutcome},
7};
8use alloy_primitives::B256;
9use futures::{Stream, StreamExt};
10use reth_chain_state::ExecutedBlockWithTrieUpdates;
11use reth_engine_primitives::{BeaconConsensusEngineEvent, BeaconEngineMessage};
12use reth_ethereum_primitives::EthPrimitives;
13use reth_payload_primitives::PayloadTypes;
14use reth_primitives_traits::{Block, NodePrimitives, RecoveredBlock};
15use std::{
16    collections::HashSet,
17    fmt::Display,
18    sync::mpsc::Sender,
19    task::{ready, Context, Poll},
20};
21use tokio::sync::mpsc::UnboundedReceiver;
22
23/// A [`ChainHandler`] that advances the chain based on incoming requests (CL engine API).
24///
25/// This is a general purpose request handler with network access.
26/// This type listens for incoming messages and processes them via the configured request handler.
27///
28/// ## Overview
29///
30/// This type is an orchestrator for incoming messages and responsible for delegating requests
31/// received from the CL to the handler.
32///
33/// It is responsible for handling the following:
34/// - Delegating incoming requests to the [`EngineRequestHandler`].
35/// - Advancing the [`EngineRequestHandler`] by polling it and emitting events.
36/// - Downloading blocks on demand from the network if requested by the [`EngineApiRequestHandler`].
37///
38/// The core logic is part of the [`EngineRequestHandler`], which is responsible for processing the
39/// incoming requests.
40#[derive(Debug)]
41pub struct EngineHandler<T, S, D> {
42    /// Processes requests.
43    ///
44    /// This type is responsible for processing incoming requests.
45    handler: T,
46    /// Receiver for incoming requests (from the engine API endpoint) that need to be processed.
47    incoming_requests: S,
48    /// A downloader to download blocks on demand.
49    downloader: D,
50}
51
52impl<T, S, D> EngineHandler<T, S, D> {
53    /// Creates a new [`EngineHandler`] with the given handler and downloader and incoming stream of
54    /// requests.
55    pub const fn new(handler: T, downloader: D, incoming_requests: S) -> Self
56    where
57        T: EngineRequestHandler,
58    {
59        Self { handler, incoming_requests, downloader }
60    }
61
62    /// Returns a mutable reference to the request handler.
63    pub const fn handler_mut(&mut self) -> &mut T {
64        &mut self.handler
65    }
66}
67
68impl<T, S, D> ChainHandler for EngineHandler<T, S, D>
69where
70    T: EngineRequestHandler<Block = D::Block>,
71    S: Stream + Send + Sync + Unpin + 'static,
72    <S as Stream>::Item: Into<T::Request>,
73    D: BlockDownloader,
74{
75    type Event = T::Event;
76
77    fn on_event(&mut self, event: FromOrchestrator) {
78        // delegate event to the handler
79        self.handler.on_event(event.into());
80    }
81
82    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<HandlerEvent<Self::Event>> {
83        loop {
84            // drain the handler first
85            while let Poll::Ready(ev) = self.handler.poll(cx) {
86                match ev {
87                    RequestHandlerEvent::HandlerEvent(ev) => {
88                        return match ev {
89                            HandlerEvent::BackfillAction(target) => {
90                                // bubble up backfill sync request
91                                self.downloader.on_action(DownloadAction::Clear);
92                                Poll::Ready(HandlerEvent::BackfillAction(target))
93                            }
94                            HandlerEvent::Event(ev) => {
95                                // bubble up the event
96                                Poll::Ready(HandlerEvent::Event(ev))
97                            }
98                            HandlerEvent::FatalError => Poll::Ready(HandlerEvent::FatalError),
99                        }
100                    }
101                    RequestHandlerEvent::Download(req) => {
102                        // delegate download request to the downloader
103                        self.downloader.on_action(DownloadAction::Download(req));
104                    }
105                }
106            }
107
108            // pop the next incoming request
109            if let Poll::Ready(Some(req)) = self.incoming_requests.poll_next_unpin(cx) {
110                // and delegate the request to the handler
111                self.handler.on_event(FromEngine::Request(req.into()));
112                // skip downloading in this iteration to allow the handler to process the request
113                continue
114            }
115
116            // advance the downloader
117            if let Poll::Ready(outcome) = self.downloader.poll(cx) {
118                if let DownloadOutcome::Blocks(blocks) = outcome {
119                    // delegate the downloaded blocks to the handler
120                    self.handler.on_event(FromEngine::DownloadedBlocks(blocks));
121                }
122                continue
123            }
124
125            return Poll::Pending
126        }
127    }
128}
129
130/// A type that processes incoming requests (e.g. requests from the consensus layer, engine API,
131/// such as newPayload).
132///
133/// ## Control flow
134///
135/// Requests and certain updates, such as a change in backfill sync status, are delegated to this
136/// type via [`EngineRequestHandler::on_event`]. This type is responsible for processing the
137/// incoming requests and advancing the chain and emit events when it is polled.
138pub trait EngineRequestHandler: Send + Sync {
139    /// Event type this handler can emit
140    type Event: Send;
141    /// The request type this handler can process.
142    type Request;
143    /// Type of the block sent in [`FromEngine::DownloadedBlocks`] variant.
144    type Block: Block;
145
146    /// Informs the handler about an event from the [`EngineHandler`].
147    fn on_event(&mut self, event: FromEngine<Self::Request, Self::Block>);
148
149    /// Advances the handler.
150    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<RequestHandlerEvent<Self::Event>>;
151}
152
153/// An [`EngineRequestHandler`] that processes engine API requests by delegating to an execution
154/// task.
155///
156/// This type is responsible for advancing the chain during live sync (following the tip of the
157/// chain).
158///
159/// It advances the chain based on received engine API requests by delegating them to the tree
160/// executor.
161///
162/// There are two types of requests that can be processed:
163///
164/// - `on_new_payload`: Executes the payload and inserts it into the tree. These are allowed to be
165///   processed concurrently.
166/// - `on_forkchoice_updated`: Updates the fork choice based on the new head. These require write
167///   access to the database and are skipped if the handler can't acquire exclusive access to the
168///   database.
169///
170/// In case required blocks are missing, the handler will request them from the network, by emitting
171/// a download request upstream.
172#[derive(Debug)]
173pub struct EngineApiRequestHandler<Request, N: NodePrimitives> {
174    /// channel to send messages to the tree to execute the payload.
175    to_tree: Sender<FromEngine<Request, N::Block>>,
176    /// channel to receive messages from the tree.
177    from_tree: UnboundedReceiver<EngineApiEvent<N>>,
178}
179
180impl<Request, N: NodePrimitives> EngineApiRequestHandler<Request, N> {
181    /// Creates a new `EngineApiRequestHandler`.
182    pub const fn new(
183        to_tree: Sender<FromEngine<Request, N::Block>>,
184        from_tree: UnboundedReceiver<EngineApiEvent<N>>,
185    ) -> Self {
186        Self { to_tree, from_tree }
187    }
188}
189
190impl<Request, N: NodePrimitives> EngineRequestHandler for EngineApiRequestHandler<Request, N>
191where
192    Request: Send,
193{
194    type Event = BeaconConsensusEngineEvent<N>;
195    type Request = Request;
196    type Block = N::Block;
197
198    fn on_event(&mut self, event: FromEngine<Self::Request, Self::Block>) {
199        // delegate to the tree
200        let _ = self.to_tree.send(event);
201    }
202
203    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<RequestHandlerEvent<Self::Event>> {
204        let Some(ev) = ready!(self.from_tree.poll_recv(cx)) else {
205            return Poll::Ready(RequestHandlerEvent::HandlerEvent(HandlerEvent::FatalError))
206        };
207
208        let ev = match ev {
209            EngineApiEvent::BeaconConsensus(ev) => {
210                RequestHandlerEvent::HandlerEvent(HandlerEvent::Event(ev))
211            }
212            EngineApiEvent::BackfillAction(action) => {
213                RequestHandlerEvent::HandlerEvent(HandlerEvent::BackfillAction(action))
214            }
215            EngineApiEvent::Download(action) => RequestHandlerEvent::Download(action),
216        };
217        Poll::Ready(ev)
218    }
219}
220
221/// The type for specifying the kind of engine api.
222#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
223pub enum EngineApiKind {
224    /// The chain contains Ethereum configuration.
225    #[default]
226    Ethereum,
227    /// The chain contains Optimism configuration.
228    OpStack,
229}
230
231impl EngineApiKind {
232    /// Returns true if this is the ethereum variant
233    pub const fn is_ethereum(&self) -> bool {
234        matches!(self, Self::Ethereum)
235    }
236
237    /// Returns true if this is the ethereum variant
238    pub const fn is_opstack(&self) -> bool {
239        matches!(self, Self::OpStack)
240    }
241}
242
243/// The request variants that the engine API handler can receive.
244#[derive(Debug)]
245pub enum EngineApiRequest<T: PayloadTypes, N: NodePrimitives> {
246    /// A request received from the consensus engine.
247    Beacon(BeaconEngineMessage<T>),
248    /// Request to insert an already executed block, e.g. via payload building.
249    InsertExecutedBlock(ExecutedBlockWithTrieUpdates<N>),
250}
251
252impl<T: PayloadTypes, N: NodePrimitives> Display for EngineApiRequest<T, N> {
253    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
254        match self {
255            Self::Beacon(msg) => msg.fmt(f),
256            Self::InsertExecutedBlock(block) => {
257                write!(f, "InsertExecutedBlock({:?})", block.recovered_block().num_hash())
258            }
259        }
260    }
261}
262
263impl<T: PayloadTypes, N: NodePrimitives> From<BeaconEngineMessage<T>> for EngineApiRequest<T, N> {
264    fn from(msg: BeaconEngineMessage<T>) -> Self {
265        Self::Beacon(msg)
266    }
267}
268
269impl<T: PayloadTypes, N: NodePrimitives> From<EngineApiRequest<T, N>>
270    for FromEngine<EngineApiRequest<T, N>, N::Block>
271{
272    fn from(req: EngineApiRequest<T, N>) -> Self {
273        Self::Request(req)
274    }
275}
276
277/// Events emitted by the engine API handler.
278#[derive(Debug)]
279pub enum EngineApiEvent<N: NodePrimitives = EthPrimitives> {
280    /// Event from the consensus engine.
281    // TODO(mattsse): find a more appropriate name for this variant, consider phasing it out.
282    BeaconConsensus(BeaconConsensusEngineEvent<N>),
283    /// Backfill action is needed.
284    BackfillAction(BackfillAction),
285    /// Block download is needed.
286    Download(DownloadRequest),
287}
288
289impl<N: NodePrimitives> EngineApiEvent<N> {
290    /// Returns `true` if the event is a backfill action.
291    pub const fn is_backfill_action(&self) -> bool {
292        matches!(self, Self::BackfillAction(_))
293    }
294}
295
296impl<N: NodePrimitives> From<BeaconConsensusEngineEvent<N>> for EngineApiEvent<N> {
297    fn from(event: BeaconConsensusEngineEvent<N>) -> Self {
298        Self::BeaconConsensus(event)
299    }
300}
301
302/// Events received from the engine.
303#[derive(Debug)]
304pub enum FromEngine<Req, B: Block> {
305    /// Event from the top level orchestrator.
306    Event(FromOrchestrator),
307    /// Request from the engine.
308    Request(Req),
309    /// Downloaded blocks from the network.
310    DownloadedBlocks(Vec<RecoveredBlock<B>>),
311}
312
313impl<Req: Display, B: Block> Display for FromEngine<Req, B> {
314    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
315        match self {
316            Self::Event(ev) => write!(f, "Event({ev:?})"),
317            Self::Request(req) => write!(f, "Request({req})"),
318            Self::DownloadedBlocks(blocks) => {
319                write!(f, "DownloadedBlocks({} blocks)", blocks.len())
320            }
321        }
322    }
323}
324
325impl<Req, B: Block> From<FromOrchestrator> for FromEngine<Req, B> {
326    fn from(event: FromOrchestrator) -> Self {
327        Self::Event(event)
328    }
329}
330
331/// Requests produced by a [`EngineRequestHandler`].
332#[derive(Debug)]
333pub enum RequestHandlerEvent<T> {
334    /// An event emitted by the handler.
335    HandlerEvent(HandlerEvent<T>),
336    /// Request to download blocks.
337    Download(DownloadRequest),
338}
339
340/// A request to download blocks from the network.
341#[derive(Debug)]
342pub enum DownloadRequest {
343    /// Download the given set of blocks.
344    BlockSet(HashSet<B256>),
345    /// Download the given range of blocks.
346    BlockRange(B256, u64),
347}
348
349impl DownloadRequest {
350    /// Returns a [`DownloadRequest`] for a single block.
351    pub fn single_block(hash: B256) -> Self {
352        Self::BlockSet(HashSet::from([hash]))
353    }
354}