reth_stages/
sets.rs

1//! Built-in [`StageSet`]s.
2//!
3//! The easiest set to use is [`DefaultStages`], which provides all stages required to run an
4//! instance of reth.
5//!
6//! It is also possible to run parts of reth standalone given the required data is present in
7//! the environment, such as [`ExecutionStages`] or [`HashingStages`].
8//!
9//!
10//! # Examples
11//!
12//! ```no_run
13//! # use reth_stages::Pipeline;
14//! # use reth_stages::sets::{OfflineStages};
15//! # use reth_chainspec::MAINNET;
16//! # use reth_prune_types::PruneModes;
17//! # use reth_evm_ethereum::EthEvmConfig;
18//! # use reth_provider::StaticFileProviderFactory;
19//! # use reth_provider::test_utils::{create_test_provider_factory, MockNodeTypesWithDB};
20//! # use reth_static_file::StaticFileProducer;
21//! # use reth_config::config::StageConfig;
22//! # use reth_evm::execute::BlockExecutorProvider;
23//! # use reth_ethereum_primitives::EthPrimitives;
24//! # use std::sync::Arc;
25//! # use reth_consensus::{FullConsensus, ConsensusError};
26//!
27//! # fn create(exec: impl BlockExecutorProvider<Primitives = EthPrimitives>, consensus: impl FullConsensus<EthPrimitives, Error = ConsensusError> + 'static) {
28//!
29//! let provider_factory = create_test_provider_factory();
30//! let static_file_producer =
31//!     StaticFileProducer::new(provider_factory.clone(), PruneModes::default());
32//! // Build a pipeline with all offline stages.
33//! let pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
34//!     .add_stages(OfflineStages::new(exec, Arc::new(consensus), StageConfig::default(), PruneModes::default()))
35//!     .build(provider_factory, static_file_producer);
36//!
37//! # }
38//! ```
39use crate::{
40    stages::{
41        AccountHashingStage, BodyStage, ExecutionStage, FinishStage, HeaderStage,
42        IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleStage, PruneSenderRecoveryStage,
43        PruneStage, SenderRecoveryStage, StorageHashingStage, TransactionLookupStage,
44    },
45    StageSet, StageSetBuilder,
46};
47use alloy_primitives::B256;
48use reth_config::config::StageConfig;
49use reth_consensus::{Consensus, ConsensusError, FullConsensus};
50use reth_evm::execute::BlockExecutorProvider;
51use reth_network_p2p::{bodies::downloader::BodyDownloader, headers::downloader::HeaderDownloader};
52use reth_primitives_traits::{Block, NodePrimitives};
53use reth_provider::HeaderSyncGapProvider;
54use reth_prune_types::PruneModes;
55use reth_stages_api::Stage;
56use std::{ops::Not, sync::Arc};
57use tokio::sync::watch;
58
59/// A set containing all stages to run a fully syncing instance of reth.
60///
61/// A combination of (in order)
62///
63/// - [`OnlineStages`]
64/// - [`OfflineStages`]
65/// - [`FinishStage`]
66///
67/// This expands to the following series of stages:
68/// - [`HeaderStage`]
69/// - [`BodyStage`]
70/// - [`SenderRecoveryStage`]
71/// - [`ExecutionStage`]
72/// - [`PruneSenderRecoveryStage`] (execute)
73/// - [`MerkleStage`] (unwind)
74/// - [`AccountHashingStage`]
75/// - [`StorageHashingStage`]
76/// - [`MerkleStage`] (execute)
77/// - [`TransactionLookupStage`]
78/// - [`IndexStorageHistoryStage`]
79/// - [`IndexAccountHistoryStage`]
80/// - [`PruneStage`] (execute)
81/// - [`FinishStage`]
82#[derive(Debug)]
83pub struct DefaultStages<Provider, H, B, E>
84where
85    H: HeaderDownloader,
86    B: BodyDownloader,
87    E: BlockExecutorProvider,
88{
89    /// Configuration for the online stages
90    online: OnlineStages<Provider, H, B>,
91    /// Executor factory needs for execution stage
92    executor_provider: E,
93    /// Consensus instance
94    consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
95    /// Configuration for each stage in the pipeline
96    stages_config: StageConfig,
97    /// Prune configuration for every segment that can be pruned
98    prune_modes: PruneModes,
99}
100
101impl<Provider, H, B, E> DefaultStages<Provider, H, B, E>
102where
103    H: HeaderDownloader,
104    B: BodyDownloader,
105    E: BlockExecutorProvider<Primitives: NodePrimitives<BlockHeader = H::Header, Block = B::Block>>,
106{
107    /// Create a new set of default stages with default values.
108    #[allow(clippy::too_many_arguments)]
109    pub fn new(
110        provider: Provider,
111        tip: watch::Receiver<B256>,
112        consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
113        header_downloader: H,
114        body_downloader: B,
115        executor_provider: E,
116        stages_config: StageConfig,
117        prune_modes: PruneModes,
118    ) -> Self {
119        Self {
120            online: OnlineStages::new(
121                provider,
122                tip,
123                consensus.clone().as_consensus(),
124                header_downloader,
125                body_downloader,
126                stages_config.clone(),
127            ),
128            executor_provider,
129            consensus,
130            stages_config,
131            prune_modes,
132        }
133    }
134}
135
136impl<P, H, B, E> DefaultStages<P, H, B, E>
137where
138    E: BlockExecutorProvider,
139    H: HeaderDownloader,
140    B: BodyDownloader,
141{
142    /// Appends the default offline stages and default finish stage to the given builder.
143    pub fn add_offline_stages<Provider>(
144        default_offline: StageSetBuilder<Provider>,
145        executor_provider: E,
146        consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
147        stages_config: StageConfig,
148        prune_modes: PruneModes,
149    ) -> StageSetBuilder<Provider>
150    where
151        OfflineStages<E>: StageSet<Provider>,
152    {
153        StageSetBuilder::default()
154            .add_set(default_offline)
155            .add_set(OfflineStages::new(executor_provider, consensus, stages_config, prune_modes))
156            .add_stage(FinishStage)
157    }
158}
159
160impl<P, H, B, E, Provider> StageSet<Provider> for DefaultStages<P, H, B, E>
161where
162    P: HeaderSyncGapProvider + 'static,
163    H: HeaderDownloader + 'static,
164    B: BodyDownloader + 'static,
165    E: BlockExecutorProvider,
166    OnlineStages<P, H, B>: StageSet<Provider>,
167    OfflineStages<E>: StageSet<Provider>,
168{
169    fn builder(self) -> StageSetBuilder<Provider> {
170        Self::add_offline_stages(
171            self.online.builder(),
172            self.executor_provider,
173            self.consensus,
174            self.stages_config.clone(),
175            self.prune_modes,
176        )
177    }
178}
179
180/// A set containing all stages that require network access by default.
181///
182/// These stages *can* be run without network access if the specified downloaders are
183/// themselves offline.
184#[derive(Debug)]
185pub struct OnlineStages<Provider, H, B>
186where
187    H: HeaderDownloader,
188    B: BodyDownloader,
189{
190    /// Sync gap provider for the headers stage.
191    provider: Provider,
192    /// The tip for the headers stage.
193    tip: watch::Receiver<B256>,
194    /// The consensus engine used to validate incoming data.
195    consensus: Arc<dyn Consensus<B::Block, Error = ConsensusError>>,
196    /// The block header downloader
197    header_downloader: H,
198    /// The block body downloader
199    body_downloader: B,
200    /// Configuration for each stage in the pipeline
201    stages_config: StageConfig,
202}
203
204impl<Provider, H, B> OnlineStages<Provider, H, B>
205where
206    H: HeaderDownloader,
207    B: BodyDownloader,
208{
209    /// Create a new set of online stages with default values.
210    pub fn new(
211        provider: Provider,
212        tip: watch::Receiver<B256>,
213        consensus: Arc<dyn Consensus<B::Block, Error = ConsensusError>>,
214        header_downloader: H,
215        body_downloader: B,
216        stages_config: StageConfig,
217    ) -> Self {
218        Self { provider, tip, consensus, header_downloader, body_downloader, stages_config }
219    }
220}
221
222impl<P, H, B> OnlineStages<P, H, B>
223where
224    P: HeaderSyncGapProvider + 'static,
225    H: HeaderDownloader<Header = <B::Block as Block>::Header> + 'static,
226    B: BodyDownloader + 'static,
227{
228    /// Create a new builder using the given headers stage.
229    pub fn builder_with_headers<Provider>(
230        headers: HeaderStage<P, H>,
231        body_downloader: B,
232    ) -> StageSetBuilder<Provider>
233    where
234        HeaderStage<P, H>: Stage<Provider>,
235        BodyStage<B>: Stage<Provider>,
236    {
237        StageSetBuilder::default().add_stage(headers).add_stage(BodyStage::new(body_downloader))
238    }
239
240    /// Create a new builder using the given bodies stage.
241    pub fn builder_with_bodies<Provider>(
242        bodies: BodyStage<B>,
243        provider: P,
244        tip: watch::Receiver<B256>,
245        header_downloader: H,
246        consensus: Arc<dyn Consensus<B::Block, Error = ConsensusError>>,
247        stages_config: StageConfig,
248    ) -> StageSetBuilder<Provider>
249    where
250        BodyStage<B>: Stage<Provider>,
251        HeaderStage<P, H>: Stage<Provider>,
252    {
253        StageSetBuilder::default()
254            .add_stage(HeaderStage::new(
255                provider,
256                header_downloader,
257                tip,
258                consensus.clone().as_header_validator(),
259                stages_config.etl,
260            ))
261            .add_stage(bodies)
262    }
263}
264
265impl<Provider, P, H, B> StageSet<Provider> for OnlineStages<P, H, B>
266where
267    P: HeaderSyncGapProvider + 'static,
268    H: HeaderDownloader<Header = <B::Block as Block>::Header> + 'static,
269    B: BodyDownloader + 'static,
270    HeaderStage<P, H>: Stage<Provider>,
271    BodyStage<B>: Stage<Provider>,
272{
273    fn builder(self) -> StageSetBuilder<Provider> {
274        StageSetBuilder::default()
275            .add_stage(HeaderStage::new(
276                self.provider,
277                self.header_downloader,
278                self.tip,
279                self.consensus.clone().as_header_validator(),
280                self.stages_config.etl.clone(),
281            ))
282            .add_stage(BodyStage::new(self.body_downloader))
283    }
284}
285
286/// A set containing all stages that do not require network access.
287///
288/// A combination of (in order)
289///
290/// - [`ExecutionStages`]
291/// - [`PruneSenderRecoveryStage`]
292/// - [`HashingStages`]
293/// - [`HistoryIndexingStages`]
294/// - [`PruneStage`]
295#[derive(Debug)]
296#[non_exhaustive]
297pub struct OfflineStages<E: BlockExecutorProvider> {
298    /// Executor factory needs for execution stage
299    executor_provider: E,
300    /// Consensus instance for validating blocks.
301    consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
302    /// Configuration for each stage in the pipeline
303    stages_config: StageConfig,
304    /// Prune configuration for every segment that can be pruned
305    prune_modes: PruneModes,
306}
307
308impl<E: BlockExecutorProvider> OfflineStages<E> {
309    /// Create a new set of offline stages with default values.
310    pub const fn new(
311        executor_provider: E,
312        consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
313        stages_config: StageConfig,
314        prune_modes: PruneModes,
315    ) -> Self {
316        Self { executor_provider, consensus, stages_config, prune_modes }
317    }
318}
319
320impl<E, Provider> StageSet<Provider> for OfflineStages<E>
321where
322    E: BlockExecutorProvider,
323    ExecutionStages<E>: StageSet<Provider>,
324    PruneSenderRecoveryStage: Stage<Provider>,
325    HashingStages: StageSet<Provider>,
326    HistoryIndexingStages: StageSet<Provider>,
327    PruneStage: Stage<Provider>,
328{
329    fn builder(self) -> StageSetBuilder<Provider> {
330        ExecutionStages::new(self.executor_provider, self.consensus, self.stages_config.clone())
331            .builder()
332            // If sender recovery prune mode is set, add the prune sender recovery stage.
333            .add_stage_opt(self.prune_modes.sender_recovery.map(|prune_mode| {
334                PruneSenderRecoveryStage::new(prune_mode, self.stages_config.prune.commit_threshold)
335            }))
336            .add_set(HashingStages { stages_config: self.stages_config.clone() })
337            .add_set(HistoryIndexingStages {
338                stages_config: self.stages_config.clone(),
339                prune_modes: self.prune_modes.clone(),
340            })
341            // If any prune modes are set, add the prune stage.
342            .add_stage_opt(self.prune_modes.is_empty().not().then(|| {
343                // Prune stage should be added after all hashing stages, because otherwise it will
344                // delete
345                PruneStage::new(self.prune_modes.clone(), self.stages_config.prune.commit_threshold)
346            }))
347    }
348}
349
350/// A set containing all stages that are required to execute pre-existing block data.
351#[derive(Debug)]
352#[non_exhaustive]
353pub struct ExecutionStages<E: BlockExecutorProvider> {
354    /// Executor factory that will create executors.
355    executor_provider: E,
356    /// Consensus instance for validating blocks.
357    consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
358    /// Configuration for each stage in the pipeline
359    stages_config: StageConfig,
360}
361
362impl<E: BlockExecutorProvider> ExecutionStages<E> {
363    /// Create a new set of execution stages with default values.
364    pub const fn new(
365        executor_provider: E,
366        consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
367        stages_config: StageConfig,
368    ) -> Self {
369        Self { executor_provider, consensus, stages_config }
370    }
371}
372
373impl<E, Provider> StageSet<Provider> for ExecutionStages<E>
374where
375    E: BlockExecutorProvider,
376    SenderRecoveryStage: Stage<Provider>,
377    ExecutionStage<E>: Stage<Provider>,
378{
379    fn builder(self) -> StageSetBuilder<Provider> {
380        StageSetBuilder::default()
381            .add_stage(SenderRecoveryStage::new(self.stages_config.sender_recovery))
382            .add_stage(ExecutionStage::from_config(
383                self.executor_provider,
384                self.consensus,
385                self.stages_config.execution,
386                self.stages_config.execution_external_clean_threshold(),
387            ))
388    }
389}
390
391/// A set containing all stages that hash account state.
392#[derive(Debug, Default)]
393#[non_exhaustive]
394pub struct HashingStages {
395    /// Configuration for each stage in the pipeline
396    stages_config: StageConfig,
397}
398
399impl<Provider> StageSet<Provider> for HashingStages
400where
401    MerkleStage: Stage<Provider>,
402    AccountHashingStage: Stage<Provider>,
403    StorageHashingStage: Stage<Provider>,
404{
405    fn builder(self) -> StageSetBuilder<Provider> {
406        StageSetBuilder::default()
407            .add_stage(MerkleStage::default_unwind())
408            .add_stage(AccountHashingStage::new(
409                self.stages_config.account_hashing,
410                self.stages_config.etl.clone(),
411            ))
412            .add_stage(StorageHashingStage::new(
413                self.stages_config.storage_hashing,
414                self.stages_config.etl.clone(),
415            ))
416            .add_stage(MerkleStage::new_execution(self.stages_config.merkle.clean_threshold))
417    }
418}
419
420/// A set containing all stages that do additional indexing for historical state.
421#[derive(Debug, Default)]
422#[non_exhaustive]
423pub struct HistoryIndexingStages {
424    /// Configuration for each stage in the pipeline
425    stages_config: StageConfig,
426    /// Prune configuration for every segment that can be pruned
427    prune_modes: PruneModes,
428}
429
430impl<Provider> StageSet<Provider> for HistoryIndexingStages
431where
432    TransactionLookupStage: Stage<Provider>,
433    IndexStorageHistoryStage: Stage<Provider>,
434    IndexAccountHistoryStage: Stage<Provider>,
435{
436    fn builder(self) -> StageSetBuilder<Provider> {
437        StageSetBuilder::default()
438            .add_stage(TransactionLookupStage::new(
439                self.stages_config.transaction_lookup,
440                self.stages_config.etl.clone(),
441                self.prune_modes.transaction_lookup,
442            ))
443            .add_stage(IndexStorageHistoryStage::new(
444                self.stages_config.index_storage_history,
445                self.stages_config.etl.clone(),
446                self.prune_modes.account_history,
447            ))
448            .add_stage(IndexAccountHistoryStage::new(
449                self.stages_config.index_account_history,
450                self.stages_config.etl.clone(),
451                self.prune_modes.storage_history,
452            ))
453    }
454}