reth_engine_tree/
engine.rs

1//! An engine API handler for the chain.
2
3use crate::{
4    backfill::BackfillAction,
5    chain::{ChainHandler, FromOrchestrator, HandlerEvent},
6    download::{BlockDownloader, DownloadAction, DownloadOutcome},
7};
8use alloy_primitives::B256;
9use futures::{Stream, StreamExt};
10use reth_chain_state::ExecutedBlockWithTrieUpdates;
11use reth_engine_primitives::{BeaconConsensusEngineEvent, BeaconEngineMessage, EngineTypes};
12use reth_ethereum_primitives::EthPrimitives;
13use reth_primitives_traits::{Block, NodePrimitives, RecoveredBlock};
14use std::{
15    collections::HashSet,
16    fmt::Display,
17    sync::mpsc::Sender,
18    task::{ready, Context, Poll},
19};
20use tokio::sync::mpsc::UnboundedReceiver;
21
22/// A [`ChainHandler`] that advances the chain based on incoming requests (CL engine API).
23///
24/// This is a general purpose request handler with network access.
25/// This type listens for incoming messages and processes them via the configured request handler.
26///
27/// ## Overview
28///
29/// This type is an orchestrator for incoming messages and responsible for delegating requests
30/// received from the CL to the handler.
31///
32/// It is responsible for handling the following:
33/// - Delegating incoming requests to the [`EngineRequestHandler`].
34/// - Advancing the [`EngineRequestHandler`] by polling it and emitting events.
35/// - Downloading blocks on demand from the network if requested by the [`EngineApiRequestHandler`].
36///
37/// The core logic is part of the [`EngineRequestHandler`], which is responsible for processing the
38/// incoming requests.
39#[derive(Debug)]
40pub struct EngineHandler<T, S, D> {
41    /// Processes requests.
42    ///
43    /// This type is responsible for processing incoming requests.
44    handler: T,
45    /// Receiver for incoming requests (from the engine API endpoint) that need to be processed.
46    incoming_requests: S,
47    /// A downloader to download blocks on demand.
48    downloader: D,
49}
50
51impl<T, S, D> EngineHandler<T, S, D> {
52    /// Creates a new [`EngineHandler`] with the given handler and downloader and incoming stream of
53    /// requests.
54    pub const fn new(handler: T, downloader: D, incoming_requests: S) -> Self
55    where
56        T: EngineRequestHandler,
57    {
58        Self { handler, incoming_requests, downloader }
59    }
60
61    /// Returns a mutable reference to the request handler.
62    pub fn handler_mut(&mut self) -> &mut T {
63        &mut self.handler
64    }
65}
66
67impl<T, S, D> ChainHandler for EngineHandler<T, S, D>
68where
69    T: EngineRequestHandler<Block = D::Block>,
70    S: Stream + Send + Sync + Unpin + 'static,
71    <S as Stream>::Item: Into<T::Request>,
72    D: BlockDownloader,
73{
74    type Event = T::Event;
75
76    fn on_event(&mut self, event: FromOrchestrator) {
77        // delegate event to the handler
78        self.handler.on_event(event.into());
79    }
80
81    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<HandlerEvent<Self::Event>> {
82        loop {
83            // drain the handler first
84            while let Poll::Ready(ev) = self.handler.poll(cx) {
85                match ev {
86                    RequestHandlerEvent::HandlerEvent(ev) => {
87                        return match ev {
88                            HandlerEvent::BackfillAction(target) => {
89                                // bubble up backfill sync request request
90                                self.downloader.on_action(DownloadAction::Clear);
91                                Poll::Ready(HandlerEvent::BackfillAction(target))
92                            }
93                            HandlerEvent::Event(ev) => {
94                                // bubble up the event
95                                Poll::Ready(HandlerEvent::Event(ev))
96                            }
97                            HandlerEvent::FatalError => Poll::Ready(HandlerEvent::FatalError),
98                        }
99                    }
100                    RequestHandlerEvent::Download(req) => {
101                        // delegate download request to the downloader
102                        self.downloader.on_action(DownloadAction::Download(req));
103                    }
104                }
105            }
106
107            // pop the next incoming request
108            if let Poll::Ready(Some(req)) = self.incoming_requests.poll_next_unpin(cx) {
109                // and delegate the request to the handler
110                self.handler.on_event(FromEngine::Request(req.into()));
111                // skip downloading in this iteration to allow the handler to process the request
112                continue
113            }
114
115            // advance the downloader
116            if let Poll::Ready(outcome) = self.downloader.poll(cx) {
117                if let DownloadOutcome::Blocks(blocks) = outcome {
118                    // delegate the downloaded blocks to the handler
119                    self.handler.on_event(FromEngine::DownloadedBlocks(blocks));
120                }
121                continue
122            }
123
124            return Poll::Pending
125        }
126    }
127}
128
129/// A type that processes incoming requests (e.g. requests from the consensus layer, engine API,
130/// such as newPayload).
131///
132/// ## Control flow
133///
134/// Requests and certain updates, such as a change in backfill sync status, are delegated to this
135/// type via [`EngineRequestHandler::on_event`]. This type is responsible for processing the
136/// incoming requests and advancing the chain and emit events when it is polled.
137pub trait EngineRequestHandler: Send + Sync {
138    /// Event type this handler can emit
139    type Event: Send;
140    /// The request type this handler can process.
141    type Request;
142    /// Type of the block sent in [`FromEngine::DownloadedBlocks`] variant.
143    type Block: Block;
144
145    /// Informs the handler about an event from the [`EngineHandler`].
146    fn on_event(&mut self, event: FromEngine<Self::Request, Self::Block>);
147
148    /// Advances the handler.
149    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<RequestHandlerEvent<Self::Event>>;
150}
151
152/// An [`EngineRequestHandler`] that processes engine API requests by delegating to an execution
153/// task.
154///
155/// This type is responsible for advancing the chain during live sync (following the tip of the
156/// chain).
157///
158/// It advances the chain based on received engine API requests by delegating them to the tree
159/// executor.
160///
161/// There are two types of requests that can be processed:
162///
163/// - `on_new_payload`: Executes the payload and inserts it into the tree. These are allowed to be
164///   processed concurrently.
165/// - `on_forkchoice_updated`: Updates the fork choice based on the new head. These require write
166///   access to the database and are skipped if the handler can't acquire exclusive access to the
167///   database.
168///
169/// In case required blocks are missing, the handler will request them from the network, by emitting
170/// a download request upstream.
171#[derive(Debug)]
172pub struct EngineApiRequestHandler<Request, N: NodePrimitives> {
173    /// channel to send messages to the tree to execute the payload.
174    to_tree: Sender<FromEngine<Request, N::Block>>,
175    /// channel to receive messages from the tree.
176    from_tree: UnboundedReceiver<EngineApiEvent<N>>,
177}
178
179impl<Request, N: NodePrimitives> EngineApiRequestHandler<Request, N> {
180    /// Creates a new `EngineApiRequestHandler`.
181    pub const fn new(
182        to_tree: Sender<FromEngine<Request, N::Block>>,
183        from_tree: UnboundedReceiver<EngineApiEvent<N>>,
184    ) -> Self {
185        Self { to_tree, from_tree }
186    }
187}
188
189impl<Request, N: NodePrimitives> EngineRequestHandler for EngineApiRequestHandler<Request, N>
190where
191    Request: Send,
192{
193    type Event = BeaconConsensusEngineEvent<N>;
194    type Request = Request;
195    type Block = N::Block;
196
197    fn on_event(&mut self, event: FromEngine<Self::Request, Self::Block>) {
198        // delegate to the tree
199        let _ = self.to_tree.send(event);
200    }
201
202    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<RequestHandlerEvent<Self::Event>> {
203        let Some(ev) = ready!(self.from_tree.poll_recv(cx)) else {
204            return Poll::Ready(RequestHandlerEvent::HandlerEvent(HandlerEvent::FatalError))
205        };
206
207        let ev = match ev {
208            EngineApiEvent::BeaconConsensus(ev) => {
209                RequestHandlerEvent::HandlerEvent(HandlerEvent::Event(ev))
210            }
211            EngineApiEvent::BackfillAction(action) => {
212                RequestHandlerEvent::HandlerEvent(HandlerEvent::BackfillAction(action))
213            }
214            EngineApiEvent::Download(action) => RequestHandlerEvent::Download(action),
215        };
216        Poll::Ready(ev)
217    }
218}
219
220/// The type for specifying the kind of engine api.
221#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
222pub enum EngineApiKind {
223    /// The chain contains Ethereum configuration.
224    #[default]
225    Ethereum,
226    /// The chain contains Optimism configuration.
227    OpStack,
228}
229
230impl EngineApiKind {
231    /// Returns true if this is the ethereum variant
232    pub const fn is_ethereum(&self) -> bool {
233        matches!(self, Self::Ethereum)
234    }
235
236    /// Returns true if this is the ethereum variant
237    pub const fn is_opstack(&self) -> bool {
238        matches!(self, Self::OpStack)
239    }
240}
241
242/// The request variants that the engine API handler can receive.
243#[derive(Debug)]
244pub enum EngineApiRequest<T: EngineTypes, N: NodePrimitives> {
245    /// A request received from the consensus engine.
246    Beacon(BeaconEngineMessage<T>),
247    /// Request to insert an already executed block, e.g. via payload building.
248    InsertExecutedBlock(ExecutedBlockWithTrieUpdates<N>),
249}
250
251impl<T: EngineTypes, N: NodePrimitives> Display for EngineApiRequest<T, N> {
252    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
253        match self {
254            Self::Beacon(msg) => msg.fmt(f),
255            Self::InsertExecutedBlock(block) => {
256                write!(f, "InsertExecutedBlock({:?})", block.recovered_block().num_hash())
257            }
258        }
259    }
260}
261
262impl<T: EngineTypes, N: NodePrimitives> From<BeaconEngineMessage<T>> for EngineApiRequest<T, N> {
263    fn from(msg: BeaconEngineMessage<T>) -> Self {
264        Self::Beacon(msg)
265    }
266}
267
268impl<T: EngineTypes, N: NodePrimitives> From<EngineApiRequest<T, N>>
269    for FromEngine<EngineApiRequest<T, N>, N::Block>
270{
271    fn from(req: EngineApiRequest<T, N>) -> Self {
272        Self::Request(req)
273    }
274}
275
276/// Events emitted by the engine API handler.
277#[derive(Debug)]
278pub enum EngineApiEvent<N: NodePrimitives = EthPrimitives> {
279    /// Event from the consensus engine.
280    // TODO(mattsse): find a more appropriate name for this variant, consider phasing it out.
281    BeaconConsensus(BeaconConsensusEngineEvent<N>),
282    /// Backfill action is needed.
283    BackfillAction(BackfillAction),
284    /// Block download is needed.
285    Download(DownloadRequest),
286}
287
288impl<N: NodePrimitives> EngineApiEvent<N> {
289    /// Returns `true` if the event is a backfill action.
290    pub const fn is_backfill_action(&self) -> bool {
291        matches!(self, Self::BackfillAction(_))
292    }
293}
294
295impl<N: NodePrimitives> From<BeaconConsensusEngineEvent<N>> for EngineApiEvent<N> {
296    fn from(event: BeaconConsensusEngineEvent<N>) -> Self {
297        Self::BeaconConsensus(event)
298    }
299}
300
301/// Events received from the engine.
302#[derive(Debug)]
303pub enum FromEngine<Req, B: Block> {
304    /// Event from the top level orchestrator.
305    Event(FromOrchestrator),
306    /// Request from the engine.
307    Request(Req),
308    /// Downloaded blocks from the network.
309    DownloadedBlocks(Vec<RecoveredBlock<B>>),
310}
311
312impl<Req: Display, B: Block> Display for FromEngine<Req, B> {
313    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
314        match self {
315            Self::Event(ev) => write!(f, "Event({ev:?})"),
316            Self::Request(req) => write!(f, "Request({req})"),
317            Self::DownloadedBlocks(blocks) => {
318                write!(f, "DownloadedBlocks({} blocks)", blocks.len())
319            }
320        }
321    }
322}
323
324impl<Req, B: Block> From<FromOrchestrator> for FromEngine<Req, B> {
325    fn from(event: FromOrchestrator) -> Self {
326        Self::Event(event)
327    }
328}
329
330/// Requests produced by a [`EngineRequestHandler`].
331#[derive(Debug)]
332pub enum RequestHandlerEvent<T> {
333    /// An event emitted by the handler.
334    HandlerEvent(HandlerEvent<T>),
335    /// Request to download blocks.
336    Download(DownloadRequest),
337}
338
339/// A request to download blocks from the network.
340#[derive(Debug)]
341pub enum DownloadRequest {
342    /// Download the given set of blocks.
343    BlockSet(HashSet<B256>),
344    /// Download the given range of blocks.
345    BlockRange(B256, u64),
346}
347
348impl DownloadRequest {
349    /// Returns a [`DownloadRequest`] for a single block.
350    pub fn single_block(hash: B256) -> Self {
351        Self::BlockSet(HashSet::from([hash]))
352    }
353}