1use crate::{
4 backfill::BackfillAction,
5 chain::{ChainHandler, FromOrchestrator, HandlerEvent},
6 download::{BlockDownloader, DownloadAction, DownloadOutcome},
7};
8use alloy_primitives::B256;
9use futures::{Stream, StreamExt};
10use reth_chain_state::ExecutedBlockWithTrieUpdates;
11use reth_engine_primitives::{BeaconConsensusEngineEvent, BeaconEngineMessage};
12use reth_ethereum_primitives::EthPrimitives;
13use reth_payload_primitives::PayloadTypes;
14use reth_primitives_traits::{Block, NodePrimitives, RecoveredBlock};
15use std::{
16 collections::HashSet,
17 fmt::Display,
18 sync::mpsc::Sender,
19 task::{ready, Context, Poll},
20};
21use tokio::sync::mpsc::UnboundedReceiver;
22
23#[derive(Debug)]
41pub struct EngineHandler<T, S, D> {
42 handler: T,
46 incoming_requests: S,
48 downloader: D,
50}
51
52impl<T, S, D> EngineHandler<T, S, D> {
53 pub const fn new(handler: T, downloader: D, incoming_requests: S) -> Self
56 where
57 T: EngineRequestHandler,
58 {
59 Self { handler, incoming_requests, downloader }
60 }
61
62 pub const fn handler_mut(&mut self) -> &mut T {
64 &mut self.handler
65 }
66}
67
68impl<T, S, D> ChainHandler for EngineHandler<T, S, D>
69where
70 T: EngineRequestHandler<Block = D::Block>,
71 S: Stream + Send + Sync + Unpin + 'static,
72 <S as Stream>::Item: Into<T::Request>,
73 D: BlockDownloader,
74{
75 type Event = T::Event;
76
77 fn on_event(&mut self, event: FromOrchestrator) {
78 self.handler.on_event(event.into());
80 }
81
82 fn poll(&mut self, cx: &mut Context<'_>) -> Poll<HandlerEvent<Self::Event>> {
83 loop {
84 while let Poll::Ready(ev) = self.handler.poll(cx) {
86 match ev {
87 RequestHandlerEvent::HandlerEvent(ev) => {
88 return match ev {
89 HandlerEvent::BackfillAction(target) => {
90 self.downloader.on_action(DownloadAction::Clear);
92 Poll::Ready(HandlerEvent::BackfillAction(target))
93 }
94 HandlerEvent::Event(ev) => {
95 Poll::Ready(HandlerEvent::Event(ev))
97 }
98 HandlerEvent::FatalError => Poll::Ready(HandlerEvent::FatalError),
99 }
100 }
101 RequestHandlerEvent::Download(req) => {
102 self.downloader.on_action(DownloadAction::Download(req));
104 }
105 }
106 }
107
108 if let Poll::Ready(Some(req)) = self.incoming_requests.poll_next_unpin(cx) {
110 self.handler.on_event(FromEngine::Request(req.into()));
112 continue
114 }
115
116 if let Poll::Ready(outcome) = self.downloader.poll(cx) {
118 if let DownloadOutcome::Blocks(blocks) = outcome {
119 self.handler.on_event(FromEngine::DownloadedBlocks(blocks));
121 }
122 continue
123 }
124
125 return Poll::Pending
126 }
127 }
128}
129
130pub trait EngineRequestHandler: Send + Sync {
139 type Event: Send;
141 type Request;
143 type Block: Block;
145
146 fn on_event(&mut self, event: FromEngine<Self::Request, Self::Block>);
148
149 fn poll(&mut self, cx: &mut Context<'_>) -> Poll<RequestHandlerEvent<Self::Event>>;
151}
152
153#[derive(Debug)]
173pub struct EngineApiRequestHandler<Request, N: NodePrimitives> {
174 to_tree: Sender<FromEngine<Request, N::Block>>,
176 from_tree: UnboundedReceiver<EngineApiEvent<N>>,
178}
179
180impl<Request, N: NodePrimitives> EngineApiRequestHandler<Request, N> {
181 pub const fn new(
183 to_tree: Sender<FromEngine<Request, N::Block>>,
184 from_tree: UnboundedReceiver<EngineApiEvent<N>>,
185 ) -> Self {
186 Self { to_tree, from_tree }
187 }
188}
189
190impl<Request, N: NodePrimitives> EngineRequestHandler for EngineApiRequestHandler<Request, N>
191where
192 Request: Send,
193{
194 type Event = BeaconConsensusEngineEvent<N>;
195 type Request = Request;
196 type Block = N::Block;
197
198 fn on_event(&mut self, event: FromEngine<Self::Request, Self::Block>) {
199 let _ = self.to_tree.send(event);
201 }
202
203 fn poll(&mut self, cx: &mut Context<'_>) -> Poll<RequestHandlerEvent<Self::Event>> {
204 let Some(ev) = ready!(self.from_tree.poll_recv(cx)) else {
205 return Poll::Ready(RequestHandlerEvent::HandlerEvent(HandlerEvent::FatalError))
206 };
207
208 let ev = match ev {
209 EngineApiEvent::BeaconConsensus(ev) => {
210 RequestHandlerEvent::HandlerEvent(HandlerEvent::Event(ev))
211 }
212 EngineApiEvent::BackfillAction(action) => {
213 RequestHandlerEvent::HandlerEvent(HandlerEvent::BackfillAction(action))
214 }
215 EngineApiEvent::Download(action) => RequestHandlerEvent::Download(action),
216 };
217 Poll::Ready(ev)
218 }
219}
220
221#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
223pub enum EngineApiKind {
224 #[default]
226 Ethereum,
227 OpStack,
229}
230
231impl EngineApiKind {
232 pub const fn is_ethereum(&self) -> bool {
234 matches!(self, Self::Ethereum)
235 }
236
237 pub const fn is_opstack(&self) -> bool {
239 matches!(self, Self::OpStack)
240 }
241}
242
243#[derive(Debug)]
245pub enum EngineApiRequest<T: PayloadTypes, N: NodePrimitives> {
246 Beacon(BeaconEngineMessage<T>),
248 InsertExecutedBlock(ExecutedBlockWithTrieUpdates<N>),
250}
251
252impl<T: PayloadTypes, N: NodePrimitives> Display for EngineApiRequest<T, N> {
253 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
254 match self {
255 Self::Beacon(msg) => msg.fmt(f),
256 Self::InsertExecutedBlock(block) => {
257 write!(f, "InsertExecutedBlock({:?})", block.recovered_block().num_hash())
258 }
259 }
260 }
261}
262
263impl<T: PayloadTypes, N: NodePrimitives> From<BeaconEngineMessage<T>> for EngineApiRequest<T, N> {
264 fn from(msg: BeaconEngineMessage<T>) -> Self {
265 Self::Beacon(msg)
266 }
267}
268
269impl<T: PayloadTypes, N: NodePrimitives> From<EngineApiRequest<T, N>>
270 for FromEngine<EngineApiRequest<T, N>, N::Block>
271{
272 fn from(req: EngineApiRequest<T, N>) -> Self {
273 Self::Request(req)
274 }
275}
276
277#[derive(Debug)]
279pub enum EngineApiEvent<N: NodePrimitives = EthPrimitives> {
280 BeaconConsensus(BeaconConsensusEngineEvent<N>),
283 BackfillAction(BackfillAction),
285 Download(DownloadRequest),
287}
288
289impl<N: NodePrimitives> EngineApiEvent<N> {
290 pub const fn is_backfill_action(&self) -> bool {
292 matches!(self, Self::BackfillAction(_))
293 }
294}
295
296impl<N: NodePrimitives> From<BeaconConsensusEngineEvent<N>> for EngineApiEvent<N> {
297 fn from(event: BeaconConsensusEngineEvent<N>) -> Self {
298 Self::BeaconConsensus(event)
299 }
300}
301
302#[derive(Debug)]
304pub enum FromEngine<Req, B: Block> {
305 Event(FromOrchestrator),
307 Request(Req),
309 DownloadedBlocks(Vec<RecoveredBlock<B>>),
311}
312
313impl<Req: Display, B: Block> Display for FromEngine<Req, B> {
314 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
315 match self {
316 Self::Event(ev) => write!(f, "Event({ev:?})"),
317 Self::Request(req) => write!(f, "Request({req})"),
318 Self::DownloadedBlocks(blocks) => {
319 write!(f, "DownloadedBlocks({} blocks)", blocks.len())
320 }
321 }
322 }
323}
324
325impl<Req, B: Block> From<FromOrchestrator> for FromEngine<Req, B> {
326 fn from(event: FromOrchestrator) -> Self {
327 Self::Event(event)
328 }
329}
330
331#[derive(Debug)]
333pub enum RequestHandlerEvent<T> {
334 HandlerEvent(HandlerEvent<T>),
336 Download(DownloadRequest),
338}
339
340#[derive(Debug)]
342pub enum DownloadRequest {
343 BlockSet(HashSet<B256>),
345 BlockRange(B256, u64),
347}
348
349impl DownloadRequest {
350 pub fn single_block(hash: B256) -> Self {
352 Self::BlockSet(HashSet::from([hash]))
353 }
354}