reth_engine_tree/chain.rs
1use crate::backfill::{BackfillAction, BackfillEvent, BackfillSync};
2use futures::Stream;
3use reth_stages_api::{ControlFlow, PipelineTarget};
4use std::{
5 fmt::{Display, Formatter, Result},
6 pin::Pin,
7 task::{Context, Poll},
8};
9use tracing::*;
10
11/// The type that drives the chain forward.
12///
13/// A state machine that orchestrates the components responsible for advancing the chain
14///
15///
16/// ## Control flow
17///
18/// The [`ChainOrchestrator`] is responsible for controlling the backfill sync and additional hooks.
19/// It polls the given `handler`, which is responsible for advancing the chain, how is up to the
20/// handler. However, due to database restrictions (e.g. exclusive write access), following
21/// invariants apply:
22/// - If the handler requests a backfill run (e.g. [`BackfillAction::Start`]), the handler must
23/// ensure that while the backfill sync is running, no other write access is granted.
24/// - At any time the [`ChainOrchestrator`] can request exclusive write access to the database
25/// (e.g. if pruning is required), but will not do so until the handler has acknowledged the
26/// request for write access.
27///
28/// The [`ChainOrchestrator`] polls the [`ChainHandler`] to advance the chain and handles the
29/// emitted events. Requests and events are passed to the [`ChainHandler`] via
30/// [`ChainHandler::on_event`].
31#[must_use = "Stream does nothing unless polled"]
32#[derive(Debug)]
33pub struct ChainOrchestrator<T, P>
34where
35 T: ChainHandler,
36 P: BackfillSync,
37{
38 /// The handler for advancing the chain.
39 handler: T,
40 /// Controls backfill sync.
41 backfill_sync: P,
42}
43
44impl<T, P> ChainOrchestrator<T, P>
45where
46 T: ChainHandler + Unpin,
47 P: BackfillSync + Unpin,
48{
49 /// Creates a new [`ChainOrchestrator`] with the given handler and backfill sync.
50 pub const fn new(handler: T, backfill_sync: P) -> Self {
51 Self { handler, backfill_sync }
52 }
53
54 /// Returns the handler
55 pub const fn handler(&self) -> &T {
56 &self.handler
57 }
58
59 /// Returns a mutable reference to the handler
60 pub const fn handler_mut(&mut self) -> &mut T {
61 &mut self.handler
62 }
63
64 /// Triggers a backfill sync for the __valid__ given target.
65 ///
66 /// CAUTION: This function should be used with care and with a valid target.
67 pub fn start_backfill_sync(&mut self, target: impl Into<PipelineTarget>) {
68 self.backfill_sync.on_action(BackfillAction::Start(target.into()));
69 }
70
71 /// Internal function used to advance the chain.
72 ///
73 /// Polls the `ChainOrchestrator` for the next event.
74 #[tracing::instrument(level = "debug", name = "ChainOrchestrator::poll", skip(self, cx))]
75 fn poll_next_event(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<ChainEvent<T::Event>> {
76 let this = self.get_mut();
77
78 // This loop polls the components
79 //
80 // 1. Polls the backfill sync to completion, if active.
81 // 2. Advances the chain by polling the handler.
82 'outer: loop {
83 // try to poll the backfill sync to completion, if active
84 match this.backfill_sync.poll(cx) {
85 Poll::Ready(backfill_sync_event) => match backfill_sync_event {
86 BackfillEvent::Started(_) => {
87 // notify handler that backfill sync started
88 this.handler.on_event(FromOrchestrator::BackfillSyncStarted);
89 return Poll::Ready(ChainEvent::BackfillSyncStarted);
90 }
91 BackfillEvent::Finished(res) => {
92 return match res {
93 Ok(ctrl) => {
94 tracing::debug!(?ctrl, "backfill sync finished");
95 // notify handler that backfill sync finished
96 this.handler.on_event(FromOrchestrator::BackfillSyncFinished(ctrl));
97 Poll::Ready(ChainEvent::BackfillSyncFinished)
98 }
99 Err(err) => {
100 tracing::error!( %err, "backfill sync failed");
101 Poll::Ready(ChainEvent::FatalError)
102 }
103 }
104 }
105 BackfillEvent::TaskDropped(err) => {
106 tracing::error!( %err, "backfill sync task dropped");
107 return Poll::Ready(ChainEvent::FatalError);
108 }
109 },
110 Poll::Pending => {}
111 }
112
113 // poll the handler for the next event
114 match this.handler.poll(cx) {
115 Poll::Ready(handler_event) => {
116 match handler_event {
117 HandlerEvent::BackfillAction(action) => {
118 // forward action to backfill_sync
119 this.backfill_sync.on_action(action);
120 }
121 HandlerEvent::Event(ev) => {
122 // bubble up the event
123 return Poll::Ready(ChainEvent::Handler(ev));
124 }
125 HandlerEvent::FatalError => {
126 error!(target: "engine::tree", "Fatal error");
127 return Poll::Ready(ChainEvent::FatalError)
128 }
129 }
130 }
131 Poll::Pending => {
132 // no more events to process
133 break 'outer
134 }
135 }
136 }
137
138 Poll::Pending
139 }
140}
141
142impl<T, P> Stream for ChainOrchestrator<T, P>
143where
144 T: ChainHandler + Unpin,
145 P: BackfillSync + Unpin,
146{
147 type Item = ChainEvent<T::Event>;
148
149 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
150 self.as_mut().poll_next_event(cx).map(Some)
151 }
152}
153
154/// Event emitted by the [`ChainOrchestrator`]
155///
156/// These are meant to be used for observability and debugging purposes.
157#[derive(Debug)]
158pub enum ChainEvent<T> {
159 /// Backfill sync started
160 BackfillSyncStarted,
161 /// Backfill sync finished
162 BackfillSyncFinished,
163 /// Fatal error
164 FatalError,
165 /// Event emitted by the handler
166 Handler(T),
167}
168
169impl<T: Display> Display for ChainEvent<T> {
170 fn fmt(&self, f: &mut Formatter<'_>) -> Result {
171 match self {
172 Self::BackfillSyncStarted => {
173 write!(f, "BackfillSyncStarted")
174 }
175 Self::BackfillSyncFinished => {
176 write!(f, "BackfillSyncFinished")
177 }
178 Self::FatalError => {
179 write!(f, "FatalError")
180 }
181 Self::Handler(event) => {
182 write!(f, "Handler({event})")
183 }
184 }
185 }
186}
187
188/// A trait that advances the chain by handling actions.
189///
190/// This is intended to be implement the chain consensus logic, for example `engine` API.
191///
192/// ## Control flow
193///
194/// The [`ChainOrchestrator`] is responsible for advancing this handler through
195/// [`ChainHandler::poll`] and handling the emitted events, for example
196/// [`HandlerEvent::BackfillAction`] to start a backfill sync. Events from the [`ChainOrchestrator`]
197/// are passed to the handler via [`ChainHandler::on_event`], e.g.
198/// [`FromOrchestrator::BackfillSyncStarted`] once the backfill sync started or finished.
199pub trait ChainHandler: Send + Sync {
200 /// Event generated by this handler that orchestrator can bubble up;
201 type Event: Send;
202
203 /// Informs the handler about an event from the [`ChainOrchestrator`].
204 fn on_event(&mut self, event: FromOrchestrator);
205
206 /// Polls for actions that [`ChainOrchestrator`] should handle.
207 fn poll(&mut self, cx: &mut Context<'_>) -> Poll<HandlerEvent<Self::Event>>;
208}
209
210/// Events/Requests that the [`ChainHandler`] can emit to the [`ChainOrchestrator`].
211#[derive(Clone, Debug)]
212pub enum HandlerEvent<T> {
213 /// Request an action to backfill sync
214 BackfillAction(BackfillAction),
215 /// Other event emitted by the handler
216 Event(T),
217 /// Fatal error
218 FatalError,
219}
220
221/// Internal events issued by the [`ChainOrchestrator`].
222#[derive(Clone, Debug)]
223pub enum FromOrchestrator {
224 /// Invoked when backfill sync finished
225 BackfillSyncFinished(ControlFlow),
226 /// Invoked when backfill sync started
227 BackfillSyncStarted,
228}