1use crate::{
4 backfill::BackfillAction,
5 chain::{ChainHandler, FromOrchestrator, HandlerEvent},
6 download::{BlockDownloader, DownloadAction, DownloadOutcome},
7};
8use alloy_primitives::B256;
9use futures::{Stream, StreamExt};
10use reth_chain_state::ExecutedBlockWithTrieUpdates;
11use reth_engine_primitives::{BeaconConsensusEngineEvent, BeaconEngineMessage, EngineTypes};
12use reth_ethereum_primitives::EthPrimitives;
13use reth_primitives_traits::{Block, NodePrimitives, RecoveredBlock};
14use std::{
15 collections::HashSet,
16 fmt::Display,
17 sync::mpsc::Sender,
18 task::{ready, Context, Poll},
19};
20use tokio::sync::mpsc::UnboundedReceiver;
21
22#[derive(Debug)]
40pub struct EngineHandler<T, S, D> {
41 handler: T,
45 incoming_requests: S,
47 downloader: D,
49}
50
51impl<T, S, D> EngineHandler<T, S, D> {
52 pub const fn new(handler: T, downloader: D, incoming_requests: S) -> Self
55 where
56 T: EngineRequestHandler,
57 {
58 Self { handler, incoming_requests, downloader }
59 }
60
61 pub fn handler_mut(&mut self) -> &mut T {
63 &mut self.handler
64 }
65}
66
67impl<T, S, D> ChainHandler for EngineHandler<T, S, D>
68where
69 T: EngineRequestHandler<Block = D::Block>,
70 S: Stream + Send + Sync + Unpin + 'static,
71 <S as Stream>::Item: Into<T::Request>,
72 D: BlockDownloader,
73{
74 type Event = T::Event;
75
76 fn on_event(&mut self, event: FromOrchestrator) {
77 self.handler.on_event(event.into());
79 }
80
81 fn poll(&mut self, cx: &mut Context<'_>) -> Poll<HandlerEvent<Self::Event>> {
82 loop {
83 while let Poll::Ready(ev) = self.handler.poll(cx) {
85 match ev {
86 RequestHandlerEvent::HandlerEvent(ev) => {
87 return match ev {
88 HandlerEvent::BackfillAction(target) => {
89 self.downloader.on_action(DownloadAction::Clear);
91 Poll::Ready(HandlerEvent::BackfillAction(target))
92 }
93 HandlerEvent::Event(ev) => {
94 Poll::Ready(HandlerEvent::Event(ev))
96 }
97 HandlerEvent::FatalError => Poll::Ready(HandlerEvent::FatalError),
98 }
99 }
100 RequestHandlerEvent::Download(req) => {
101 self.downloader.on_action(DownloadAction::Download(req));
103 }
104 }
105 }
106
107 if let Poll::Ready(Some(req)) = self.incoming_requests.poll_next_unpin(cx) {
109 self.handler.on_event(FromEngine::Request(req.into()));
111 continue
113 }
114
115 if let Poll::Ready(outcome) = self.downloader.poll(cx) {
117 if let DownloadOutcome::Blocks(blocks) = outcome {
118 self.handler.on_event(FromEngine::DownloadedBlocks(blocks));
120 }
121 continue
122 }
123
124 return Poll::Pending
125 }
126 }
127}
128
129pub trait EngineRequestHandler: Send + Sync {
138 type Event: Send;
140 type Request;
142 type Block: Block;
144
145 fn on_event(&mut self, event: FromEngine<Self::Request, Self::Block>);
147
148 fn poll(&mut self, cx: &mut Context<'_>) -> Poll<RequestHandlerEvent<Self::Event>>;
150}
151
152#[derive(Debug)]
172pub struct EngineApiRequestHandler<Request, N: NodePrimitives> {
173 to_tree: Sender<FromEngine<Request, N::Block>>,
175 from_tree: UnboundedReceiver<EngineApiEvent<N>>,
177}
178
179impl<Request, N: NodePrimitives> EngineApiRequestHandler<Request, N> {
180 pub const fn new(
182 to_tree: Sender<FromEngine<Request, N::Block>>,
183 from_tree: UnboundedReceiver<EngineApiEvent<N>>,
184 ) -> Self {
185 Self { to_tree, from_tree }
186 }
187}
188
189impl<Request, N: NodePrimitives> EngineRequestHandler for EngineApiRequestHandler<Request, N>
190where
191 Request: Send,
192{
193 type Event = BeaconConsensusEngineEvent<N>;
194 type Request = Request;
195 type Block = N::Block;
196
197 fn on_event(&mut self, event: FromEngine<Self::Request, Self::Block>) {
198 let _ = self.to_tree.send(event);
200 }
201
202 fn poll(&mut self, cx: &mut Context<'_>) -> Poll<RequestHandlerEvent<Self::Event>> {
203 let Some(ev) = ready!(self.from_tree.poll_recv(cx)) else {
204 return Poll::Ready(RequestHandlerEvent::HandlerEvent(HandlerEvent::FatalError))
205 };
206
207 let ev = match ev {
208 EngineApiEvent::BeaconConsensus(ev) => {
209 RequestHandlerEvent::HandlerEvent(HandlerEvent::Event(ev))
210 }
211 EngineApiEvent::BackfillAction(action) => {
212 RequestHandlerEvent::HandlerEvent(HandlerEvent::BackfillAction(action))
213 }
214 EngineApiEvent::Download(action) => RequestHandlerEvent::Download(action),
215 };
216 Poll::Ready(ev)
217 }
218}
219
220#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
222pub enum EngineApiKind {
223 #[default]
225 Ethereum,
226 OpStack,
228}
229
230impl EngineApiKind {
231 pub const fn is_ethereum(&self) -> bool {
233 matches!(self, Self::Ethereum)
234 }
235
236 pub const fn is_opstack(&self) -> bool {
238 matches!(self, Self::OpStack)
239 }
240}
241
242#[derive(Debug)]
244pub enum EngineApiRequest<T: EngineTypes, N: NodePrimitives> {
245 Beacon(BeaconEngineMessage<T>),
247 InsertExecutedBlock(ExecutedBlockWithTrieUpdates<N>),
249}
250
251impl<T: EngineTypes, N: NodePrimitives> Display for EngineApiRequest<T, N> {
252 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
253 match self {
254 Self::Beacon(msg) => msg.fmt(f),
255 Self::InsertExecutedBlock(block) => {
256 write!(f, "InsertExecutedBlock({:?})", block.recovered_block().num_hash())
257 }
258 }
259 }
260}
261
262impl<T: EngineTypes, N: NodePrimitives> From<BeaconEngineMessage<T>> for EngineApiRequest<T, N> {
263 fn from(msg: BeaconEngineMessage<T>) -> Self {
264 Self::Beacon(msg)
265 }
266}
267
268impl<T: EngineTypes, N: NodePrimitives> From<EngineApiRequest<T, N>>
269 for FromEngine<EngineApiRequest<T, N>, N::Block>
270{
271 fn from(req: EngineApiRequest<T, N>) -> Self {
272 Self::Request(req)
273 }
274}
275
276#[derive(Debug)]
278pub enum EngineApiEvent<N: NodePrimitives = EthPrimitives> {
279 BeaconConsensus(BeaconConsensusEngineEvent<N>),
282 BackfillAction(BackfillAction),
284 Download(DownloadRequest),
286}
287
288impl<N: NodePrimitives> EngineApiEvent<N> {
289 pub const fn is_backfill_action(&self) -> bool {
291 matches!(self, Self::BackfillAction(_))
292 }
293}
294
295impl<N: NodePrimitives> From<BeaconConsensusEngineEvent<N>> for EngineApiEvent<N> {
296 fn from(event: BeaconConsensusEngineEvent<N>) -> Self {
297 Self::BeaconConsensus(event)
298 }
299}
300
301#[derive(Debug)]
303pub enum FromEngine<Req, B: Block> {
304 Event(FromOrchestrator),
306 Request(Req),
308 DownloadedBlocks(Vec<RecoveredBlock<B>>),
310}
311
312impl<Req: Display, B: Block> Display for FromEngine<Req, B> {
313 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
314 match self {
315 Self::Event(ev) => write!(f, "Event({ev:?})"),
316 Self::Request(req) => write!(f, "Request({req})"),
317 Self::DownloadedBlocks(blocks) => {
318 write!(f, "DownloadedBlocks({} blocks)", blocks.len())
319 }
320 }
321 }
322}
323
324impl<Req, B: Block> From<FromOrchestrator> for FromEngine<Req, B> {
325 fn from(event: FromOrchestrator) -> Self {
326 Self::Event(event)
327 }
328}
329
330#[derive(Debug)]
332pub enum RequestHandlerEvent<T> {
333 HandlerEvent(HandlerEvent<T>),
335 Download(DownloadRequest),
337}
338
339#[derive(Debug)]
341pub enum DownloadRequest {
342 BlockSet(HashSet<B256>),
344 BlockRange(B256, u64),
346}
347
348impl DownloadRequest {
349 pub fn single_block(hash: B256) -> Self {
351 Self::BlockSet(HashSet::from([hash]))
352 }
353}