reth_stages_api/
stage.rs

1use crate::{error::StageError, StageCheckpoint, StageId};
2use alloy_primitives::{BlockNumber, TxNumber};
3use reth_provider::{BlockReader, ProviderError};
4use std::{
5    cmp::{max, min},
6    future::{poll_fn, Future},
7    ops::{Range, RangeInclusive},
8    task::{Context, Poll},
9};
10
11/// Stage execution input, see [`Stage::execute`].
12#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
13pub struct ExecInput {
14    /// The target block number the stage needs to execute towards.
15    pub target: Option<BlockNumber>,
16    /// The checkpoint of this stage the last time it was executed.
17    pub checkpoint: Option<StageCheckpoint>,
18}
19
20impl ExecInput {
21    /// Return the checkpoint of the stage or default.
22    pub fn checkpoint(&self) -> StageCheckpoint {
23        self.checkpoint.unwrap_or_default()
24    }
25
26    /// Return the next block number after the current
27    /// +1 is needed to skip the present block and always start from block number 1, not 0.
28    pub fn next_block(&self) -> BlockNumber {
29        let current_block = self.checkpoint();
30        current_block.block_number + 1
31    }
32
33    /// Returns `true` if the target block number has already been reached.
34    pub fn target_reached(&self) -> bool {
35        self.checkpoint().block_number >= self.target()
36    }
37
38    /// Return the target block number or default.
39    pub fn target(&self) -> BlockNumber {
40        self.target.unwrap_or_default()
41    }
42
43    /// Return next block range that needs to be executed.
44    pub fn next_block_range(&self) -> RangeInclusive<BlockNumber> {
45        let (range, _) = self.next_block_range_with_threshold(u64::MAX);
46        range
47    }
48
49    /// Return true if this is the first block range to execute.
50    pub const fn is_first_range(&self) -> bool {
51        self.checkpoint.is_none()
52    }
53
54    /// Return the next block range to execute.
55    /// Return pair of the block range and if this is final block range.
56    pub fn next_block_range_with_threshold(
57        &self,
58        threshold: u64,
59    ) -> (RangeInclusive<BlockNumber>, bool) {
60        let current_block = self.checkpoint();
61        let start = current_block.block_number + 1;
62        let target = self.target();
63
64        let end = min(target, current_block.block_number.saturating_add(threshold));
65
66        let is_final_range = end == target;
67        (start..=end, is_final_range)
68    }
69
70    /// Return the next block range determined the number of transactions within it.
71    /// This function walks the block indices until either the end of the range is reached or
72    /// the number of transactions exceeds the threshold.
73    pub fn next_block_range_with_transaction_threshold<Provider>(
74        &self,
75        provider: &Provider,
76        tx_threshold: u64,
77    ) -> Result<(Range<TxNumber>, RangeInclusive<BlockNumber>, bool), StageError>
78    where
79        Provider: BlockReader,
80    {
81        let start_block = self.next_block();
82        let target_block = self.target();
83
84        let start_block_body = provider
85            .block_body_indices(start_block)?
86            .ok_or(ProviderError::BlockBodyIndicesNotFound(start_block))?;
87        let first_tx_num = start_block_body.first_tx_num();
88
89        let target_block_body = provider
90            .block_body_indices(target_block)?
91            .ok_or(ProviderError::BlockBodyIndicesNotFound(target_block))?;
92
93        // number of transactions left to execute.
94        let all_tx_cnt = target_block_body.next_tx_num() - first_tx_num;
95
96        if all_tx_cnt == 0 {
97            // if there is no more transaction return back.
98            return Ok((first_tx_num..first_tx_num, start_block..=target_block, true))
99        }
100
101        // get block of this tx
102        let (end_block, is_final_range, next_tx_num) = if all_tx_cnt <= tx_threshold {
103            (target_block, true, target_block_body.next_tx_num())
104        } else {
105            // get tx block number. next_tx_num in this case will be less than all_tx_cnt.
106            // So we are sure that transaction must exist.
107            let end_block_number = provider
108                .transaction_block(first_tx_num + tx_threshold)?
109                .expect("block of tx must exist");
110            // we want to get range of all transactions of this block, so we are fetching block
111            // body.
112            let end_block_body = provider
113                .block_body_indices(end_block_number)?
114                .ok_or(ProviderError::BlockBodyIndicesNotFound(target_block))?;
115            (end_block_number, false, end_block_body.next_tx_num())
116        };
117
118        let tx_range = first_tx_num..next_tx_num;
119        Ok((tx_range, start_block..=end_block, is_final_range))
120    }
121}
122
123/// Stage unwind input, see [`Stage::unwind`].
124#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
125pub struct UnwindInput {
126    /// The current highest checkpoint of the stage.
127    pub checkpoint: StageCheckpoint,
128    /// The block to unwind to.
129    pub unwind_to: BlockNumber,
130    /// The bad block that caused the unwind, if any.
131    pub bad_block: Option<BlockNumber>,
132}
133
134impl UnwindInput {
135    /// Return next block range that needs to be unwound.
136    pub fn unwind_block_range(&self) -> RangeInclusive<BlockNumber> {
137        self.unwind_block_range_with_threshold(u64::MAX).0
138    }
139
140    /// Return the next block range to unwind and the block we're unwinding to.
141    pub fn unwind_block_range_with_threshold(
142        &self,
143        threshold: u64,
144    ) -> (RangeInclusive<BlockNumber>, BlockNumber, bool) {
145        // +1 is to skip the block we're unwinding to
146        let mut start = self.unwind_to + 1;
147        let end = self.checkpoint;
148
149        start = max(start, end.block_number.saturating_sub(threshold));
150
151        let unwind_to = start - 1;
152
153        let is_final_range = unwind_to == self.unwind_to;
154        (start..=end.block_number, unwind_to, is_final_range)
155    }
156}
157
158/// The output of a stage execution.
159#[derive(Debug, PartialEq, Eq, Clone)]
160pub struct ExecOutput {
161    /// How far the stage got.
162    pub checkpoint: StageCheckpoint,
163    /// Whether or not the stage is done.
164    pub done: bool,
165}
166
167impl ExecOutput {
168    /// Mark the stage as done, checkpointing at the given place.
169    pub const fn done(checkpoint: StageCheckpoint) -> Self {
170        Self { checkpoint, done: true }
171    }
172}
173
174/// The output of a stage unwinding.
175#[derive(Debug, PartialEq, Eq, Clone)]
176pub struct UnwindOutput {
177    /// The checkpoint at which the stage has unwound to.
178    pub checkpoint: StageCheckpoint,
179}
180
181/// A stage is a segmented part of the syncing process of the node.
182///
183/// Each stage takes care of a well-defined task, such as downloading headers or executing
184/// transactions, and persist their results to a database.
185///
186/// Stages must have a unique [ID][StageId] and implement a way to "roll forwards"
187/// ([Stage::execute]) and a way to "roll back" ([Stage::unwind]).
188///
189/// Stages are executed as part of a pipeline where they are executed serially.
190///
191/// Stages receive [`DBProvider`](reth_provider::DBProvider).
192#[auto_impl::auto_impl(Box)]
193pub trait Stage<Provider>: Send + Sync {
194    /// Get the ID of the stage.
195    ///
196    /// Stage IDs must be unique.
197    fn id(&self) -> StageId;
198
199    /// Returns `Poll::Ready(Ok(()))` when the stage is ready to execute the given range.
200    ///
201    /// This method is heavily inspired by [tower](https://crates.io/crates/tower)'s `Service` trait.
202    /// Any asynchronous tasks or communication should be handled in `poll_execute_ready`, e.g.
203    /// moving downloaded items from downloaders to an internal buffer in the stage.
204    ///
205    /// If the stage has any pending external state, then `Poll::Pending` is returned.
206    ///
207    /// If `Poll::Ready(Err(_))` is returned, the stage may not be able to execute anymore
208    /// depending on the specific error. In that case, an unwind must be issued instead.
209    ///
210    /// Once `Poll::Ready(Ok(()))` is returned, the stage may be executed once using `execute`.
211    /// Until the stage has been executed, repeated calls to `poll_execute_ready` must return either
212    /// `Poll::Ready(Ok(()))` or `Poll::Ready(Err(_))`.
213    ///
214    /// Note that `poll_execute_ready` may reserve shared resources that are consumed in a
215    /// subsequent call of `execute`, e.g. internal buffers. It is crucial for implementations
216    /// to not assume that `execute` will always be invoked and to ensure that those resources
217    /// are appropriately released if the stage is dropped before `execute` is called.
218    ///
219    /// For the same reason, it is also important that any shared resources do not exhibit
220    /// unbounded growth on repeated calls to `poll_execute_ready`.
221    ///
222    /// Unwinds may happen without consulting `poll_execute_ready` first.
223    fn poll_execute_ready(
224        &mut self,
225        _cx: &mut Context<'_>,
226        _input: ExecInput,
227    ) -> Poll<Result<(), StageError>> {
228        Poll::Ready(Ok(()))
229    }
230
231    /// Execute the stage.
232    /// It is expected that the stage will write all necessary data to the database
233    /// upon invoking this method.
234    fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError>;
235
236    /// Post execution commit hook.
237    ///
238    /// This is called after the stage has been executed and the data has been committed by the
239    /// provider. The stage may want to pass some data from [`Self::execute`] via the internal
240    /// field.
241    fn post_execute_commit(&mut self) -> Result<(), StageError> {
242        Ok(())
243    }
244
245    /// Unwind the stage.
246    fn unwind(
247        &mut self,
248        provider: &Provider,
249        input: UnwindInput,
250    ) -> Result<UnwindOutput, StageError>;
251
252    /// Post unwind commit hook.
253    ///
254    /// This is called after the stage has been unwound and the data has been committed by the
255    /// provider. The stage may want to pass some data from [`Self::unwind`] via the internal
256    /// field.
257    fn post_unwind_commit(&mut self) -> Result<(), StageError> {
258        Ok(())
259    }
260}
261
262/// [Stage] trait extension.
263pub trait StageExt<Provider>: Stage<Provider> {
264    /// Utility extension for the `Stage` trait that invokes `Stage::poll_execute_ready`
265    /// with [`poll_fn`] context. For more information see [`Stage::poll_execute_ready`].
266    fn execute_ready(
267        &mut self,
268        input: ExecInput,
269    ) -> impl Future<Output = Result<(), StageError>> + Send {
270        poll_fn(move |cx| self.poll_execute_ready(cx, input))
271    }
272}
273
274impl<Provider, S: Stage<Provider>> StageExt<Provider> for S {}