reth_stages_api/
stage.rs

1use crate::{error::StageError, StageCheckpoint, StageId};
2use alloy_primitives::{BlockNumber, TxNumber};
3use reth_provider::{BlockReader, ProviderError};
4use std::{
5    cmp::{max, min},
6    future::{poll_fn, Future},
7    ops::{Range, RangeInclusive},
8    task::{Context, Poll},
9};
10
11/// Stage execution input, see [`Stage::execute`].
12#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
13pub struct ExecInput {
14    /// The target block number the stage needs to execute towards.
15    pub target: Option<BlockNumber>,
16    /// The checkpoint of this stage the last time it was executed.
17    pub checkpoint: Option<StageCheckpoint>,
18}
19
20impl ExecInput {
21    /// Return the checkpoint of the stage or default.
22    pub fn checkpoint(&self) -> StageCheckpoint {
23        self.checkpoint.unwrap_or_default()
24    }
25
26    /// Return the next block number after the current
27    /// +1 is needed to skip the present block and always start from block number 1, not 0.
28    pub fn next_block(&self) -> BlockNumber {
29        let current_block = self.checkpoint();
30        current_block.block_number + 1
31    }
32
33    /// Returns `true` if the target block number has already been reached.
34    pub fn target_reached(&self) -> bool {
35        self.checkpoint().block_number >= self.target()
36    }
37
38    /// Return the target block number or default.
39    pub fn target(&self) -> BlockNumber {
40        self.target.unwrap_or_default()
41    }
42
43    /// Return next block range that needs to be executed.
44    pub fn next_block_range(&self) -> RangeInclusive<BlockNumber> {
45        let (range, _) = self.next_block_range_with_threshold(u64::MAX);
46        range
47    }
48
49    /// Return true if this is the first block range to execute.
50    pub const fn is_first_range(&self) -> bool {
51        self.checkpoint.is_none()
52    }
53
54    /// Return the next block range to execute.
55    /// Return pair of the block range and if this is final block range.
56    pub fn next_block_range_with_threshold(
57        &self,
58        threshold: u64,
59    ) -> (RangeInclusive<BlockNumber>, bool) {
60        let current_block = self.checkpoint();
61        let start = current_block.block_number + 1;
62        let target = self.target();
63
64        let end = min(target, current_block.block_number.saturating_add(threshold));
65
66        let is_final_range = end == target;
67        (start..=end, is_final_range)
68    }
69
70    /// Return the next block range determined the number of transactions within it.
71    /// This function walks the block indices until either the end of the range is reached or
72    /// the number of transactions exceeds the threshold.
73    pub fn next_block_range_with_transaction_threshold<Provider>(
74        &self,
75        provider: &Provider,
76        tx_threshold: u64,
77    ) -> Result<(Range<TxNumber>, RangeInclusive<BlockNumber>, bool), StageError>
78    where
79        Provider: BlockReader,
80    {
81        let start_block = self.next_block();
82        let target_block = self.target();
83
84        let start_block_body = provider
85            .block_body_indices(start_block)?
86            .ok_or(ProviderError::BlockBodyIndicesNotFound(start_block))?;
87        let first_tx_num = start_block_body.first_tx_num();
88
89        let target_block_body = provider
90            .block_body_indices(target_block)?
91            .ok_or(ProviderError::BlockBodyIndicesNotFound(target_block))?;
92
93        // number of transactions left to execute.
94        let all_tx_cnt = target_block_body.next_tx_num() - first_tx_num;
95
96        if all_tx_cnt == 0 {
97            // if there is no more transaction return back.
98            return Ok((first_tx_num..first_tx_num, start_block..=target_block, true))
99        }
100
101        // get block of this tx
102        let (end_block, is_final_range, next_tx_num) = if all_tx_cnt <= tx_threshold {
103            (target_block, true, target_block_body.next_tx_num())
104        } else {
105            // get tx block number. next_tx_num in this case will be less than all_tx_cnt.
106            // So we are sure that transaction must exist.
107            let end_block_number = provider
108                .transaction_block(first_tx_num + tx_threshold)?
109                .expect("block of tx must exist");
110            // we want to get range of all transactions of this block, so we are fetching block
111            // body.
112            let end_block_body = provider
113                .block_body_indices(end_block_number)?
114                .ok_or(ProviderError::BlockBodyIndicesNotFound(end_block_number))?;
115            (end_block_number, false, end_block_body.next_tx_num())
116        };
117
118        let tx_range = first_tx_num..next_tx_num;
119        Ok((tx_range, start_block..=end_block, is_final_range))
120    }
121}
122
123/// Stage unwind input, see [`Stage::unwind`].
124#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
125pub struct UnwindInput {
126    /// The current highest checkpoint of the stage.
127    pub checkpoint: StageCheckpoint,
128    /// The block to unwind to.
129    pub unwind_to: BlockNumber,
130    /// The bad block that caused the unwind, if any.
131    pub bad_block: Option<BlockNumber>,
132}
133
134impl UnwindInput {
135    /// Return next block range that needs to be unwound.
136    pub fn unwind_block_range(&self) -> RangeInclusive<BlockNumber> {
137        self.unwind_block_range_with_threshold(u64::MAX).0
138    }
139
140    /// Return the next block range to unwind and the block we're unwinding to.
141    pub fn unwind_block_range_with_threshold(
142        &self,
143        threshold: u64,
144    ) -> (RangeInclusive<BlockNumber>, BlockNumber, bool) {
145        // +1 is to skip the block we're unwinding to
146        let mut start = self.unwind_to + 1;
147        let end = self.checkpoint;
148
149        start = max(start, end.block_number.saturating_sub(threshold));
150
151        let unwind_to = start - 1;
152
153        let is_final_range = unwind_to == self.unwind_to;
154        (start..=end.block_number, unwind_to, is_final_range)
155    }
156}
157
158/// The output of a stage execution.
159#[derive(Debug, PartialEq, Eq, Clone)]
160pub struct ExecOutput {
161    /// How far the stage got.
162    pub checkpoint: StageCheckpoint,
163    /// Whether or not the stage is done.
164    pub done: bool,
165}
166
167impl ExecOutput {
168    /// Mark the stage as not done, checkpointing at the given place.
169    pub const fn in_progress(checkpoint: StageCheckpoint) -> Self {
170        Self { checkpoint, done: false }
171    }
172
173    /// Mark the stage as done, checkpointing at the given place.
174    pub const fn done(checkpoint: StageCheckpoint) -> Self {
175        Self { checkpoint, done: true }
176    }
177}
178
179/// The output of a stage unwinding.
180#[derive(Debug, PartialEq, Eq, Clone)]
181pub struct UnwindOutput {
182    /// The checkpoint at which the stage has unwound to.
183    pub checkpoint: StageCheckpoint,
184}
185
186/// A stage is a segmented part of the syncing process of the node.
187///
188/// Each stage takes care of a well-defined task, such as downloading headers or executing
189/// transactions, and persist their results to a database.
190///
191/// Stages must have a unique [ID][StageId] and implement a way to "roll forwards"
192/// ([`Stage::execute`]) and a way to "roll back" ([`Stage::unwind`]).
193///
194/// Stages are executed as part of a pipeline where they are executed serially.
195///
196/// Stages receive [`DBProvider`](reth_provider::DBProvider).
197#[auto_impl::auto_impl(Box)]
198pub trait Stage<Provider>: Send + Sync {
199    /// Get the ID of the stage.
200    ///
201    /// Stage IDs must be unique.
202    fn id(&self) -> StageId;
203
204    /// Returns `Poll::Ready(Ok(()))` when the stage is ready to execute the given range.
205    ///
206    /// This method is heavily inspired by [tower](https://crates.io/crates/tower)'s `Service` trait.
207    /// Any asynchronous tasks or communication should be handled in `poll_execute_ready`, e.g.
208    /// moving downloaded items from downloaders to an internal buffer in the stage.
209    ///
210    /// If the stage has any pending external state, then `Poll::Pending` is returned.
211    ///
212    /// If `Poll::Ready(Err(_))` is returned, the stage may not be able to execute anymore
213    /// depending on the specific error. In that case, an unwind must be issued instead.
214    ///
215    /// Once `Poll::Ready(Ok(()))` is returned, the stage may be executed once using `execute`.
216    /// Until the stage has been executed, repeated calls to `poll_execute_ready` must return either
217    /// `Poll::Ready(Ok(()))` or `Poll::Ready(Err(_))`.
218    ///
219    /// Note that `poll_execute_ready` may reserve shared resources that are consumed in a
220    /// subsequent call of `execute`, e.g. internal buffers. It is crucial for implementations
221    /// to not assume that `execute` will always be invoked and to ensure that those resources
222    /// are appropriately released if the stage is dropped before `execute` is called.
223    ///
224    /// For the same reason, it is also important that any shared resources do not exhibit
225    /// unbounded growth on repeated calls to `poll_execute_ready`.
226    ///
227    /// Unwinds may happen without consulting `poll_execute_ready` first.
228    fn poll_execute_ready(
229        &mut self,
230        _cx: &mut Context<'_>,
231        _input: ExecInput,
232    ) -> Poll<Result<(), StageError>> {
233        Poll::Ready(Ok(()))
234    }
235
236    /// Execute the stage.
237    /// It is expected that the stage will write all necessary data to the database
238    /// upon invoking this method.
239    fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError>;
240
241    /// Post execution commit hook.
242    ///
243    /// This is called after the stage has been executed and the data has been committed by the
244    /// provider. The stage may want to pass some data from [`Self::execute`] via the internal
245    /// field.
246    fn post_execute_commit(&mut self) -> Result<(), StageError> {
247        Ok(())
248    }
249
250    /// Unwind the stage.
251    fn unwind(
252        &mut self,
253        provider: &Provider,
254        input: UnwindInput,
255    ) -> Result<UnwindOutput, StageError>;
256
257    /// Post unwind commit hook.
258    ///
259    /// This is called after the stage has been unwound and the data has been committed by the
260    /// provider. The stage may want to pass some data from [`Self::unwind`] via the internal
261    /// field.
262    fn post_unwind_commit(&mut self) -> Result<(), StageError> {
263        Ok(())
264    }
265}
266
267/// [Stage] trait extension.
268pub trait StageExt<Provider>: Stage<Provider> {
269    /// Utility extension for the `Stage` trait that invokes `Stage::poll_execute_ready`
270    /// with [`poll_fn`] context. For more information see [`Stage::poll_execute_ready`].
271    fn execute_ready(
272        &mut self,
273        input: ExecInput,
274    ) -> impl Future<Output = Result<(), StageError>> + Send {
275        poll_fn(move |cx| self.poll_execute_ready(cx, input))
276    }
277}
278
279impl<Provider, S: Stage<Provider> + ?Sized> StageExt<Provider> for S {}