reth_stages_api/stage.rs
1use crate::{error::StageError, StageCheckpoint, StageId};
2use alloy_primitives::{BlockNumber, TxNumber};
3use reth_provider::{BlockReader, ProviderError};
4use std::{
5 cmp::{max, min},
6 future::{poll_fn, Future},
7 ops::{Range, RangeInclusive},
8 task::{Context, Poll},
9};
10
11/// Stage execution input, see [`Stage::execute`].
12#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
13pub struct ExecInput {
14 /// The target block number the stage needs to execute towards.
15 pub target: Option<BlockNumber>,
16 /// The checkpoint of this stage the last time it was executed.
17 pub checkpoint: Option<StageCheckpoint>,
18}
19
20impl ExecInput {
21 /// Return the checkpoint of the stage or default.
22 pub fn checkpoint(&self) -> StageCheckpoint {
23 self.checkpoint.unwrap_or_default()
24 }
25
26 /// Return the next block number after the current
27 /// +1 is needed to skip the present block and always start from block number 1, not 0.
28 pub fn next_block(&self) -> BlockNumber {
29 let current_block = self.checkpoint();
30 current_block.block_number + 1
31 }
32
33 /// Returns `true` if the target block number has already been reached.
34 pub fn target_reached(&self) -> bool {
35 self.checkpoint().block_number >= self.target()
36 }
37
38 /// Return the target block number or default.
39 pub fn target(&self) -> BlockNumber {
40 self.target.unwrap_or_default()
41 }
42
43 /// Return next block range that needs to be executed.
44 pub fn next_block_range(&self) -> RangeInclusive<BlockNumber> {
45 let (range, _) = self.next_block_range_with_threshold(u64::MAX);
46 range
47 }
48
49 /// Return true if this is the first block range to execute.
50 pub const fn is_first_range(&self) -> bool {
51 self.checkpoint.is_none()
52 }
53
54 /// Return the next block range to execute.
55 /// Return pair of the block range and if this is final block range.
56 pub fn next_block_range_with_threshold(
57 &self,
58 threshold: u64,
59 ) -> (RangeInclusive<BlockNumber>, bool) {
60 let current_block = self.checkpoint();
61 let start = current_block.block_number + 1;
62 let target = self.target();
63
64 let end = min(target, current_block.block_number.saturating_add(threshold));
65
66 let is_final_range = end == target;
67 (start..=end, is_final_range)
68 }
69
70 /// Return the next block range determined the number of transactions within it.
71 /// This function walks the block indices until either the end of the range is reached or
72 /// the number of transactions exceeds the threshold.
73 pub fn next_block_range_with_transaction_threshold<Provider>(
74 &self,
75 provider: &Provider,
76 tx_threshold: u64,
77 ) -> Result<(Range<TxNumber>, RangeInclusive<BlockNumber>, bool), StageError>
78 where
79 Provider: BlockReader,
80 {
81 let start_block = self.next_block();
82 let target_block = self.target();
83
84 let start_block_body = provider
85 .block_body_indices(start_block)?
86 .ok_or(ProviderError::BlockBodyIndicesNotFound(start_block))?;
87 let first_tx_num = start_block_body.first_tx_num();
88
89 let target_block_body = provider
90 .block_body_indices(target_block)?
91 .ok_or(ProviderError::BlockBodyIndicesNotFound(target_block))?;
92
93 // number of transactions left to execute.
94 let all_tx_cnt = target_block_body.next_tx_num() - first_tx_num;
95
96 if all_tx_cnt == 0 {
97 // if there is no more transaction return back.
98 return Ok((first_tx_num..first_tx_num, start_block..=target_block, true))
99 }
100
101 // get block of this tx
102 let (end_block, is_final_range, next_tx_num) = if all_tx_cnt <= tx_threshold {
103 (target_block, true, target_block_body.next_tx_num())
104 } else {
105 // get tx block number. next_tx_num in this case will be less than all_tx_cnt.
106 // So we are sure that transaction must exist.
107 let end_block_number = provider
108 .transaction_block(first_tx_num + tx_threshold)?
109 .expect("block of tx must exist");
110 // we want to get range of all transactions of this block, so we are fetching block
111 // body.
112 let end_block_body = provider
113 .block_body_indices(end_block_number)?
114 .ok_or(ProviderError::BlockBodyIndicesNotFound(target_block))?;
115 (end_block_number, false, end_block_body.next_tx_num())
116 };
117
118 let tx_range = first_tx_num..next_tx_num;
119 Ok((tx_range, start_block..=end_block, is_final_range))
120 }
121}
122
123/// Stage unwind input, see [`Stage::unwind`].
124#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
125pub struct UnwindInput {
126 /// The current highest checkpoint of the stage.
127 pub checkpoint: StageCheckpoint,
128 /// The block to unwind to.
129 pub unwind_to: BlockNumber,
130 /// The bad block that caused the unwind, if any.
131 pub bad_block: Option<BlockNumber>,
132}
133
134impl UnwindInput {
135 /// Return next block range that needs to be unwound.
136 pub fn unwind_block_range(&self) -> RangeInclusive<BlockNumber> {
137 self.unwind_block_range_with_threshold(u64::MAX).0
138 }
139
140 /// Return the next block range to unwind and the block we're unwinding to.
141 pub fn unwind_block_range_with_threshold(
142 &self,
143 threshold: u64,
144 ) -> (RangeInclusive<BlockNumber>, BlockNumber, bool) {
145 // +1 is to skip the block we're unwinding to
146 let mut start = self.unwind_to + 1;
147 let end = self.checkpoint;
148
149 start = max(start, end.block_number.saturating_sub(threshold));
150
151 let unwind_to = start - 1;
152
153 let is_final_range = unwind_to == self.unwind_to;
154 (start..=end.block_number, unwind_to, is_final_range)
155 }
156}
157
158/// The output of a stage execution.
159#[derive(Debug, PartialEq, Eq, Clone)]
160pub struct ExecOutput {
161 /// How far the stage got.
162 pub checkpoint: StageCheckpoint,
163 /// Whether or not the stage is done.
164 pub done: bool,
165}
166
167impl ExecOutput {
168 /// Mark the stage as done, checkpointing at the given place.
169 pub const fn done(checkpoint: StageCheckpoint) -> Self {
170 Self { checkpoint, done: true }
171 }
172}
173
174/// The output of a stage unwinding.
175#[derive(Debug, PartialEq, Eq, Clone)]
176pub struct UnwindOutput {
177 /// The checkpoint at which the stage has unwound to.
178 pub checkpoint: StageCheckpoint,
179}
180
181/// A stage is a segmented part of the syncing process of the node.
182///
183/// Each stage takes care of a well-defined task, such as downloading headers or executing
184/// transactions, and persist their results to a database.
185///
186/// Stages must have a unique [ID][StageId] and implement a way to "roll forwards"
187/// ([Stage::execute]) and a way to "roll back" ([Stage::unwind]).
188///
189/// Stages are executed as part of a pipeline where they are executed serially.
190///
191/// Stages receive [`DBProvider`](reth_provider::DBProvider).
192#[auto_impl::auto_impl(Box)]
193pub trait Stage<Provider>: Send + Sync {
194 /// Get the ID of the stage.
195 ///
196 /// Stage IDs must be unique.
197 fn id(&self) -> StageId;
198
199 /// Returns `Poll::Ready(Ok(()))` when the stage is ready to execute the given range.
200 ///
201 /// This method is heavily inspired by [tower](https://crates.io/crates/tower)'s `Service` trait.
202 /// Any asynchronous tasks or communication should be handled in `poll_execute_ready`, e.g.
203 /// moving downloaded items from downloaders to an internal buffer in the stage.
204 ///
205 /// If the stage has any pending external state, then `Poll::Pending` is returned.
206 ///
207 /// If `Poll::Ready(Err(_))` is returned, the stage may not be able to execute anymore
208 /// depending on the specific error. In that case, an unwind must be issued instead.
209 ///
210 /// Once `Poll::Ready(Ok(()))` is returned, the stage may be executed once using `execute`.
211 /// Until the stage has been executed, repeated calls to `poll_execute_ready` must return either
212 /// `Poll::Ready(Ok(()))` or `Poll::Ready(Err(_))`.
213 ///
214 /// Note that `poll_execute_ready` may reserve shared resources that are consumed in a
215 /// subsequent call of `execute`, e.g. internal buffers. It is crucial for implementations
216 /// to not assume that `execute` will always be invoked and to ensure that those resources
217 /// are appropriately released if the stage is dropped before `execute` is called.
218 ///
219 /// For the same reason, it is also important that any shared resources do not exhibit
220 /// unbounded growth on repeated calls to `poll_execute_ready`.
221 ///
222 /// Unwinds may happen without consulting `poll_execute_ready` first.
223 fn poll_execute_ready(
224 &mut self,
225 _cx: &mut Context<'_>,
226 _input: ExecInput,
227 ) -> Poll<Result<(), StageError>> {
228 Poll::Ready(Ok(()))
229 }
230
231 /// Execute the stage.
232 /// It is expected that the stage will write all necessary data to the database
233 /// upon invoking this method.
234 fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError>;
235
236 /// Post execution commit hook.
237 ///
238 /// This is called after the stage has been executed and the data has been committed by the
239 /// provider. The stage may want to pass some data from [`Self::execute`] via the internal
240 /// field.
241 fn post_execute_commit(&mut self) -> Result<(), StageError> {
242 Ok(())
243 }
244
245 /// Unwind the stage.
246 fn unwind(
247 &mut self,
248 provider: &Provider,
249 input: UnwindInput,
250 ) -> Result<UnwindOutput, StageError>;
251
252 /// Post unwind commit hook.
253 ///
254 /// This is called after the stage has been unwound and the data has been committed by the
255 /// provider. The stage may want to pass some data from [`Self::unwind`] via the internal
256 /// field.
257 fn post_unwind_commit(&mut self) -> Result<(), StageError> {
258 Ok(())
259 }
260}
261
262/// [Stage] trait extension.
263pub trait StageExt<Provider>: Stage<Provider> {
264 /// Utility extension for the `Stage` trait that invokes `Stage::poll_execute_ready`
265 /// with [`poll_fn`] context. For more information see [`Stage::poll_execute_ready`].
266 fn execute_ready(
267 &mut self,
268 input: ExecInput,
269 ) -> impl Future<Output = Result<(), StageError>> + Send {
270 poll_fn(move |cx| self.poll_execute_ready(cx, input))
271 }
272}
273
274impl<Provider, S: Stage<Provider>> StageExt<Provider> for S {}