Skip to main content

reth_engine_tree/
engine.rs

1//! An engine API handler for the chain.
2
3use crate::{
4    backfill::BackfillAction,
5    chain::{ChainHandler, FromOrchestrator, HandlerEvent},
6    download::{BlockDownloader, DownloadAction, DownloadOutcome},
7};
8use alloy_primitives::{map::B256Set, B256};
9use crossbeam_channel::Sender;
10use futures::{Stream, StreamExt};
11use reth_engine_primitives::{BeaconEngineMessage, ConsensusEngineEvent};
12use reth_ethereum_primitives::EthPrimitives;
13use reth_payload_primitives::{BuiltPayloadExecutedBlock, PayloadTypes};
14use reth_primitives_traits::{Block, NodePrimitives, SealedBlock};
15use std::{
16    fmt::Display,
17    task::{ready, Context, Poll},
18};
19use tokio::sync::mpsc::UnboundedReceiver;
20
21/// A [`ChainHandler`] that advances the chain based on incoming requests (CL engine API).
22///
23/// This is a general purpose request handler with network access.
24/// This type listens for incoming messages and processes them via the configured request handler.
25///
26/// ## Overview
27///
28/// This type is an orchestrator for incoming messages and responsible for delegating requests
29/// received from the CL to the handler.
30///
31/// It is responsible for handling the following:
32/// - Delegating incoming requests to the [`EngineRequestHandler`].
33/// - Advancing the [`EngineRequestHandler`] by polling it and emitting events.
34/// - Downloading blocks on demand from the network if requested by the [`EngineApiRequestHandler`].
35///
36/// The core logic is part of the [`EngineRequestHandler`], which is responsible for processing the
37/// incoming requests.
38#[derive(Debug)]
39pub struct EngineHandler<T, S, D> {
40    /// Processes requests.
41    ///
42    /// This type is responsible for processing incoming requests.
43    handler: T,
44    /// Receiver for incoming requests (from the engine API endpoint) that need to be processed.
45    incoming_requests: S,
46    /// A downloader to download blocks on demand.
47    downloader: D,
48}
49
50impl<T, S, D> EngineHandler<T, S, D> {
51    /// Creates a new [`EngineHandler`] with the given handler and downloader and incoming stream of
52    /// requests.
53    pub const fn new(handler: T, downloader: D, incoming_requests: S) -> Self
54    where
55        T: EngineRequestHandler,
56    {
57        Self { handler, incoming_requests, downloader }
58    }
59
60    /// Returns a mutable reference to the request handler.
61    pub const fn handler_mut(&mut self) -> &mut T {
62        &mut self.handler
63    }
64}
65
66impl<T, S, D> ChainHandler for EngineHandler<T, S, D>
67where
68    T: EngineRequestHandler<Block = D::Block>,
69    S: Stream + Send + Sync + Unpin + 'static,
70    <S as Stream>::Item: Into<T::Request>,
71    D: BlockDownloader,
72{
73    type Event = T::Event;
74
75    fn on_event(&mut self, event: FromOrchestrator) {
76        // delegate event to the handler
77        self.handler.on_event(event.into());
78    }
79
80    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<HandlerEvent<Self::Event>> {
81        loop {
82            // drain the handler first
83            while let Poll::Ready(ev) = self.handler.poll(cx) {
84                match ev {
85                    RequestHandlerEvent::HandlerEvent(ev) => {
86                        return match ev {
87                            HandlerEvent::BackfillAction(target) => {
88                                // bubble up backfill sync request
89                                self.downloader.on_action(DownloadAction::Clear);
90                                Poll::Ready(HandlerEvent::BackfillAction(target))
91                            }
92                            HandlerEvent::Event(ev) => {
93                                // bubble up the event
94                                Poll::Ready(HandlerEvent::Event(ev))
95                            }
96                            HandlerEvent::FatalError => Poll::Ready(HandlerEvent::FatalError),
97                        }
98                    }
99                    RequestHandlerEvent::Download(req) => {
100                        // delegate download request to the downloader
101                        self.downloader.on_action(DownloadAction::Download(req));
102                    }
103                }
104            }
105
106            // pop the next incoming request
107            if let Poll::Ready(Some(req)) = self.incoming_requests.poll_next_unpin(cx) {
108                // and delegate the request to the handler
109                self.handler.on_event(FromEngine::Request(req.into()));
110                // skip downloading in this iteration to allow the handler to process the request
111                continue
112            }
113
114            // advance the downloader
115            if let Poll::Ready(outcome) = self.downloader.poll(cx) {
116                if let DownloadOutcome::Blocks(blocks) = outcome {
117                    // delegate the downloaded blocks to the handler
118                    self.handler.on_event(FromEngine::DownloadedBlocks(blocks));
119                }
120                continue
121            }
122
123            return Poll::Pending
124        }
125    }
126}
127
128/// A type that processes incoming requests (e.g. requests from the consensus layer, engine API,
129/// such as newPayload).
130///
131/// ## Control flow
132///
133/// Requests and certain updates, such as a change in backfill sync status, are delegated to this
134/// type via [`EngineRequestHandler::on_event`]. This type is responsible for processing the
135/// incoming requests and advancing the chain and emit events when it is polled.
136pub trait EngineRequestHandler: Send + Sync {
137    /// Event type this handler can emit
138    type Event: Send;
139    /// The request type this handler can process.
140    type Request;
141    /// Type of the block sent in [`FromEngine::DownloadedBlocks`] variant.
142    type Block: Block;
143
144    /// Informs the handler about an event from the [`EngineHandler`].
145    fn on_event(&mut self, event: FromEngine<Self::Request, Self::Block>);
146
147    /// Advances the handler.
148    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<RequestHandlerEvent<Self::Event>>;
149}
150
151/// An [`EngineRequestHandler`] that processes engine API requests by delegating to an execution
152/// task.
153///
154/// This type is responsible for advancing the chain during live sync (following the tip of the
155/// chain).
156///
157/// It advances the chain based on received engine API requests by delegating them to the tree
158/// executor.
159///
160/// There are two types of requests that can be processed:
161///
162/// - `on_new_payload`: Executes the payload and inserts it into the tree. These are allowed to be
163///   processed concurrently.
164/// - `on_forkchoice_updated`: Updates the fork choice based on the new head. These require write
165///   access to the database and are skipped if the handler can't acquire exclusive access to the
166///   database.
167///
168/// In case required blocks are missing, the handler will request them from the network, by emitting
169/// a download request upstream.
170#[derive(Debug)]
171pub struct EngineApiRequestHandler<Request, N: NodePrimitives> {
172    /// channel to send messages to the tree to execute the payload.
173    to_tree: Sender<FromEngine<Request, N::Block>>,
174    /// channel to receive messages from the tree.
175    from_tree: UnboundedReceiver<EngineApiEvent<N>>,
176}
177
178impl<Request, N: NodePrimitives> EngineApiRequestHandler<Request, N> {
179    /// Creates a new `EngineApiRequestHandler`.
180    pub const fn new(
181        to_tree: Sender<FromEngine<Request, N::Block>>,
182        from_tree: UnboundedReceiver<EngineApiEvent<N>>,
183    ) -> Self {
184        Self { to_tree, from_tree }
185    }
186}
187
188impl<Request, N: NodePrimitives> EngineRequestHandler for EngineApiRequestHandler<Request, N>
189where
190    Request: Send,
191{
192    type Event = ConsensusEngineEvent<N>;
193    type Request = Request;
194    type Block = N::Block;
195
196    fn on_event(&mut self, event: FromEngine<Self::Request, Self::Block>) {
197        // delegate to the tree
198        let _ = self.to_tree.send(event);
199    }
200
201    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<RequestHandlerEvent<Self::Event>> {
202        let Some(ev) = ready!(self.from_tree.poll_recv(cx)) else {
203            return Poll::Ready(RequestHandlerEvent::HandlerEvent(HandlerEvent::FatalError))
204        };
205
206        let ev = match ev {
207            EngineApiEvent::BeaconConsensus(ev) => {
208                RequestHandlerEvent::HandlerEvent(HandlerEvent::Event(ev))
209            }
210            EngineApiEvent::BackfillAction(action) => {
211                RequestHandlerEvent::HandlerEvent(HandlerEvent::BackfillAction(action))
212            }
213            EngineApiEvent::Download(action) => RequestHandlerEvent::Download(action),
214        };
215        Poll::Ready(ev)
216    }
217}
218
219/// The type for specifying the kind of engine api.
220#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
221pub enum EngineApiKind {
222    /// The chain contains Ethereum configuration.
223    #[default]
224    Ethereum,
225    /// The chain contains Optimism configuration.
226    OpStack,
227}
228
229impl EngineApiKind {
230    /// Returns true if this is the ethereum variant
231    pub const fn is_ethereum(&self) -> bool {
232        matches!(self, Self::Ethereum)
233    }
234
235    /// Returns true if this is the ethereum variant
236    pub const fn is_opstack(&self) -> bool {
237        matches!(self, Self::OpStack)
238    }
239}
240
241/// The request variants that the engine API handler can receive.
242#[derive(Debug)]
243pub enum EngineApiRequest<T: PayloadTypes, N: NodePrimitives> {
244    /// A request received from the consensus engine.
245    Beacon(BeaconEngineMessage<T>),
246    /// Request to insert an already executed block, e.g. via payload building.
247    InsertExecutedBlock(BuiltPayloadExecutedBlock<N>),
248}
249
250impl<T: PayloadTypes, N: NodePrimitives> Display for EngineApiRequest<T, N> {
251    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
252        match self {
253            Self::Beacon(msg) => msg.fmt(f),
254            Self::InsertExecutedBlock(payload) => {
255                write!(f, "InsertExecutedBlock({:?})", payload.recovered_block.num_hash())
256            }
257        }
258    }
259}
260
261impl<T: PayloadTypes, N: NodePrimitives> From<BeaconEngineMessage<T>> for EngineApiRequest<T, N> {
262    fn from(msg: BeaconEngineMessage<T>) -> Self {
263        Self::Beacon(msg)
264    }
265}
266
267impl<T: PayloadTypes, N: NodePrimitives> From<EngineApiRequest<T, N>>
268    for FromEngine<EngineApiRequest<T, N>, N::Block>
269{
270    fn from(req: EngineApiRequest<T, N>) -> Self {
271        Self::Request(req)
272    }
273}
274
275/// Events emitted by the engine API handler.
276#[derive(Debug)]
277pub enum EngineApiEvent<N: NodePrimitives = EthPrimitives> {
278    /// Event from the consensus engine.
279    // TODO(mattsse): find a more appropriate name for this variant, consider phasing it out.
280    BeaconConsensus(ConsensusEngineEvent<N>),
281    /// Backfill action is needed.
282    BackfillAction(BackfillAction),
283    /// Block download is needed.
284    Download(DownloadRequest),
285}
286
287impl<N: NodePrimitives> EngineApiEvent<N> {
288    /// Returns `true` if the event is a backfill action.
289    pub const fn is_backfill_action(&self) -> bool {
290        matches!(self, Self::BackfillAction(_))
291    }
292}
293
294impl<N: NodePrimitives> From<ConsensusEngineEvent<N>> for EngineApiEvent<N> {
295    fn from(event: ConsensusEngineEvent<N>) -> Self {
296        Self::BeaconConsensus(event)
297    }
298}
299
300/// Events received from the engine.
301#[derive(Debug)]
302pub enum FromEngine<Req, B: Block> {
303    /// Event from the top level orchestrator.
304    Event(FromOrchestrator),
305    /// Request from the engine.
306    Request(Req),
307    /// Downloaded blocks from the network.
308    DownloadedBlocks(Vec<SealedBlock<B>>),
309}
310
311impl<Req: Display, B: Block> Display for FromEngine<Req, B> {
312    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
313        match self {
314            Self::Event(ev) => write!(f, "Event({ev:?})"),
315            Self::Request(req) => write!(f, "Request({req})"),
316            Self::DownloadedBlocks(blocks) => {
317                write!(f, "DownloadedBlocks({} blocks)", blocks.len())
318            }
319        }
320    }
321}
322
323impl<Req, B: Block> From<FromOrchestrator> for FromEngine<Req, B> {
324    fn from(event: FromOrchestrator) -> Self {
325        Self::Event(event)
326    }
327}
328
329/// Requests produced by a [`EngineRequestHandler`].
330#[derive(Debug)]
331pub enum RequestHandlerEvent<T> {
332    /// An event emitted by the handler.
333    HandlerEvent(HandlerEvent<T>),
334    /// Request to download blocks.
335    Download(DownloadRequest),
336}
337
338/// A request to download blocks from the network.
339#[derive(Debug)]
340pub enum DownloadRequest {
341    /// Download the given set of blocks.
342    BlockSet(B256Set),
343    /// Download the given range of blocks.
344    BlockRange(B256, u64),
345}
346
347impl DownloadRequest {
348    /// Returns a [`DownloadRequest`] for a single block.
349    pub fn single_block(hash: B256) -> Self {
350        Self::BlockSet(B256Set::from_iter([hash]))
351    }
352}