1use crate::{
4 backfill::BackfillAction,
5 chain::{ChainHandler, FromOrchestrator, HandlerEvent},
6 download::{BlockDownloader, DownloadAction, DownloadOutcome},
7};
8use alloy_primitives::{map::B256Set, B256};
9use crossbeam_channel::Sender;
10use futures::{Stream, StreamExt};
11use reth_engine_primitives::{BeaconEngineMessage, ConsensusEngineEvent};
12use reth_ethereum_primitives::EthPrimitives;
13use reth_payload_primitives::{BuiltPayloadExecutedBlock, PayloadTypes};
14use reth_primitives_traits::{Block, NodePrimitives, SealedBlock};
15use std::{
16 fmt::Display,
17 task::{ready, Context, Poll},
18};
19use tokio::sync::mpsc::UnboundedReceiver;
20
21#[derive(Debug)]
39pub struct EngineHandler<T, S, D> {
40 handler: T,
44 incoming_requests: S,
46 downloader: D,
48}
49
50impl<T, S, D> EngineHandler<T, S, D> {
51 pub const fn new(handler: T, downloader: D, incoming_requests: S) -> Self
54 where
55 T: EngineRequestHandler,
56 {
57 Self { handler, incoming_requests, downloader }
58 }
59
60 pub const fn handler_mut(&mut self) -> &mut T {
62 &mut self.handler
63 }
64}
65
66impl<T, S, D> ChainHandler for EngineHandler<T, S, D>
67where
68 T: EngineRequestHandler<Block = D::Block>,
69 S: Stream + Send + Sync + Unpin + 'static,
70 <S as Stream>::Item: Into<T::Request>,
71 D: BlockDownloader,
72{
73 type Event = T::Event;
74
75 fn on_event(&mut self, event: FromOrchestrator) {
76 self.handler.on_event(event.into());
78 }
79
80 fn poll(&mut self, cx: &mut Context<'_>) -> Poll<HandlerEvent<Self::Event>> {
81 loop {
82 while let Poll::Ready(ev) = self.handler.poll(cx) {
84 match ev {
85 RequestHandlerEvent::HandlerEvent(ev) => {
86 return match ev {
87 HandlerEvent::BackfillAction(target) => {
88 self.downloader.on_action(DownloadAction::Clear);
90 Poll::Ready(HandlerEvent::BackfillAction(target))
91 }
92 HandlerEvent::Event(ev) => {
93 Poll::Ready(HandlerEvent::Event(ev))
95 }
96 HandlerEvent::FatalError => Poll::Ready(HandlerEvent::FatalError),
97 }
98 }
99 RequestHandlerEvent::Download(req) => {
100 self.downloader.on_action(DownloadAction::Download(req));
102 }
103 }
104 }
105
106 if let Poll::Ready(Some(req)) = self.incoming_requests.poll_next_unpin(cx) {
108 self.handler.on_event(FromEngine::Request(req.into()));
110 continue
112 }
113
114 if let Poll::Ready(outcome) = self.downloader.poll(cx) {
116 if let DownloadOutcome::Blocks(blocks) = outcome {
117 self.handler.on_event(FromEngine::DownloadedBlocks(blocks));
119 }
120 continue
121 }
122
123 return Poll::Pending
124 }
125 }
126}
127
128pub trait EngineRequestHandler: Send + Sync {
137 type Event: Send;
139 type Request;
141 type Block: Block;
143
144 fn on_event(&mut self, event: FromEngine<Self::Request, Self::Block>);
146
147 fn poll(&mut self, cx: &mut Context<'_>) -> Poll<RequestHandlerEvent<Self::Event>>;
149}
150
151#[derive(Debug)]
171pub struct EngineApiRequestHandler<Request, N: NodePrimitives> {
172 to_tree: Sender<FromEngine<Request, N::Block>>,
174 from_tree: UnboundedReceiver<EngineApiEvent<N>>,
176}
177
178impl<Request, N: NodePrimitives> EngineApiRequestHandler<Request, N> {
179 pub const fn new(
181 to_tree: Sender<FromEngine<Request, N::Block>>,
182 from_tree: UnboundedReceiver<EngineApiEvent<N>>,
183 ) -> Self {
184 Self { to_tree, from_tree }
185 }
186}
187
188impl<Request, N: NodePrimitives> EngineRequestHandler for EngineApiRequestHandler<Request, N>
189where
190 Request: Send,
191{
192 type Event = ConsensusEngineEvent<N>;
193 type Request = Request;
194 type Block = N::Block;
195
196 fn on_event(&mut self, event: FromEngine<Self::Request, Self::Block>) {
197 let _ = self.to_tree.send(event);
199 }
200
201 fn poll(&mut self, cx: &mut Context<'_>) -> Poll<RequestHandlerEvent<Self::Event>> {
202 let Some(ev) = ready!(self.from_tree.poll_recv(cx)) else {
203 return Poll::Ready(RequestHandlerEvent::HandlerEvent(HandlerEvent::FatalError))
204 };
205
206 let ev = match ev {
207 EngineApiEvent::BeaconConsensus(ev) => {
208 RequestHandlerEvent::HandlerEvent(HandlerEvent::Event(ev))
209 }
210 EngineApiEvent::BackfillAction(action) => {
211 RequestHandlerEvent::HandlerEvent(HandlerEvent::BackfillAction(action))
212 }
213 EngineApiEvent::Download(action) => RequestHandlerEvent::Download(action),
214 };
215 Poll::Ready(ev)
216 }
217}
218
219#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
221pub enum EngineApiKind {
222 #[default]
224 Ethereum,
225 OpStack,
227}
228
229impl EngineApiKind {
230 pub const fn is_ethereum(&self) -> bool {
232 matches!(self, Self::Ethereum)
233 }
234
235 pub const fn is_opstack(&self) -> bool {
237 matches!(self, Self::OpStack)
238 }
239}
240
241#[derive(Debug)]
243pub enum EngineApiRequest<T: PayloadTypes, N: NodePrimitives> {
244 Beacon(BeaconEngineMessage<T>),
246 InsertExecutedBlock(BuiltPayloadExecutedBlock<N>),
248}
249
250impl<T: PayloadTypes, N: NodePrimitives> Display for EngineApiRequest<T, N> {
251 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
252 match self {
253 Self::Beacon(msg) => msg.fmt(f),
254 Self::InsertExecutedBlock(payload) => {
255 write!(f, "InsertExecutedBlock({:?})", payload.recovered_block.num_hash())
256 }
257 }
258 }
259}
260
261impl<T: PayloadTypes, N: NodePrimitives> From<BeaconEngineMessage<T>> for EngineApiRequest<T, N> {
262 fn from(msg: BeaconEngineMessage<T>) -> Self {
263 Self::Beacon(msg)
264 }
265}
266
267impl<T: PayloadTypes, N: NodePrimitives> From<EngineApiRequest<T, N>>
268 for FromEngine<EngineApiRequest<T, N>, N::Block>
269{
270 fn from(req: EngineApiRequest<T, N>) -> Self {
271 Self::Request(req)
272 }
273}
274
275#[derive(Debug)]
277pub enum EngineApiEvent<N: NodePrimitives = EthPrimitives> {
278 BeaconConsensus(ConsensusEngineEvent<N>),
281 BackfillAction(BackfillAction),
283 Download(DownloadRequest),
285}
286
287impl<N: NodePrimitives> EngineApiEvent<N> {
288 pub const fn is_backfill_action(&self) -> bool {
290 matches!(self, Self::BackfillAction(_))
291 }
292}
293
294impl<N: NodePrimitives> From<ConsensusEngineEvent<N>> for EngineApiEvent<N> {
295 fn from(event: ConsensusEngineEvent<N>) -> Self {
296 Self::BeaconConsensus(event)
297 }
298}
299
300#[derive(Debug)]
302pub enum FromEngine<Req, B: Block> {
303 Event(FromOrchestrator),
305 Request(Req),
307 DownloadedBlocks(Vec<SealedBlock<B>>),
309}
310
311impl<Req: Display, B: Block> Display for FromEngine<Req, B> {
312 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
313 match self {
314 Self::Event(ev) => write!(f, "Event({ev:?})"),
315 Self::Request(req) => write!(f, "Request({req})"),
316 Self::DownloadedBlocks(blocks) => {
317 write!(f, "DownloadedBlocks({} blocks)", blocks.len())
318 }
319 }
320 }
321}
322
323impl<Req, B: Block> From<FromOrchestrator> for FromEngine<Req, B> {
324 fn from(event: FromOrchestrator) -> Self {
325 Self::Event(event)
326 }
327}
328
329#[derive(Debug)]
331pub enum RequestHandlerEvent<T> {
332 HandlerEvent(HandlerEvent<T>),
334 Download(DownloadRequest),
336}
337
338#[derive(Debug)]
340pub enum DownloadRequest {
341 BlockSet(B256Set),
343 BlockRange(B256, u64),
345}
346
347impl DownloadRequest {
348 pub fn single_block(hash: B256) -> Self {
350 Self::BlockSet(B256Set::from_iter([hash]))
351 }
352}