reth_stages/
sets.rs

1//! Built-in [`StageSet`]s.
2//!
3//! The easiest set to use is [`DefaultStages`], which provides all stages required to run an
4//! instance of reth.
5//!
6//! It is also possible to run parts of reth standalone given the required data is present in
7//! the environment, such as [`ExecutionStages`] or [`HashingStages`].
8//!
9//!
10//! # Examples
11//!
12//! ```no_run
13//! # use reth_stages::Pipeline;
14//! # use reth_stages::sets::{OfflineStages};
15//! # use reth_chainspec::MAINNET;
16//! # use reth_prune_types::PruneModes;
17//! # use reth_evm_ethereum::EthEvmConfig;
18//! # use reth_evm::ConfigureEvm;
19//! # use reth_provider::StaticFileProviderFactory;
20//! # use reth_provider::test_utils::{create_test_provider_factory, MockNodeTypesWithDB};
21//! # use reth_static_file::StaticFileProducer;
22//! # use reth_config::config::StageConfig;
23//! # use reth_ethereum_primitives::EthPrimitives;
24//! # use std::sync::Arc;
25//! # use reth_consensus::{FullConsensus, ConsensusError};
26//!
27//! # fn create(exec: impl ConfigureEvm<Primitives = EthPrimitives> + 'static, consensus: impl FullConsensus<EthPrimitives, Error = ConsensusError> + 'static) {
28//!
29//! let provider_factory = create_test_provider_factory();
30//! let static_file_producer =
31//!     StaticFileProducer::new(provider_factory.clone(), PruneModes::default());
32//! // Build a pipeline with all offline stages.
33//! let pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
34//!     .add_stages(OfflineStages::new(exec, Arc::new(consensus), StageConfig::default(), PruneModes::default()))
35//!     .build(provider_factory, static_file_producer);
36//!
37//! # }
38//! ```
39use crate::{
40    stages::{
41        AccountHashingStage, BodyStage, EraImportSource, EraStage, ExecutionStage, FinishStage,
42        HeaderStage, IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleStage,
43        PruneSenderRecoveryStage, PruneStage, SenderRecoveryStage, StorageHashingStage,
44        TransactionLookupStage,
45    },
46    StageSet, StageSetBuilder,
47};
48use alloy_primitives::B256;
49use reth_config::config::StageConfig;
50use reth_consensus::{ConsensusError, FullConsensus};
51use reth_evm::ConfigureEvm;
52use reth_network_p2p::{bodies::downloader::BodyDownloader, headers::downloader::HeaderDownloader};
53use reth_primitives_traits::{Block, NodePrimitives};
54use reth_provider::HeaderSyncGapProvider;
55use reth_prune_types::PruneModes;
56use reth_stages_api::Stage;
57use std::{ops::Not, sync::Arc};
58use tokio::sync::watch;
59
60/// A set containing all stages to run a fully syncing instance of reth.
61///
62/// A combination of (in order)
63///
64/// - [`OnlineStages`]
65/// - [`OfflineStages`]
66/// - [`FinishStage`]
67///
68/// This expands to the following series of stages:
69/// - [`HeaderStage`]
70/// - [`BodyStage`]
71/// - [`SenderRecoveryStage`]
72/// - [`ExecutionStage`]
73/// - [`PruneSenderRecoveryStage`] (execute)
74/// - [`MerkleStage`] (unwind)
75/// - [`AccountHashingStage`]
76/// - [`StorageHashingStage`]
77/// - [`MerkleStage`] (execute)
78/// - [`TransactionLookupStage`]
79/// - [`IndexStorageHistoryStage`]
80/// - [`IndexAccountHistoryStage`]
81/// - [`PruneStage`] (execute)
82/// - [`FinishStage`]
83#[derive(Debug)]
84pub struct DefaultStages<Provider, H, B, E>
85where
86    H: HeaderDownloader,
87    B: BodyDownloader,
88    E: ConfigureEvm,
89{
90    /// Configuration for the online stages
91    online: OnlineStages<Provider, H, B>,
92    /// Executor factory needs for execution stage
93    evm_config: E,
94    /// Consensus instance
95    consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
96    /// Configuration for each stage in the pipeline
97    stages_config: StageConfig,
98    /// Prune configuration for every segment that can be pruned
99    prune_modes: PruneModes,
100}
101
102impl<Provider, H, B, E> DefaultStages<Provider, H, B, E>
103where
104    H: HeaderDownloader,
105    B: BodyDownloader,
106    E: ConfigureEvm<Primitives: NodePrimitives<BlockHeader = H::Header, Block = B::Block>>,
107{
108    /// Create a new set of default stages with default values.
109    #[expect(clippy::too_many_arguments)]
110    pub fn new(
111        provider: Provider,
112        tip: watch::Receiver<B256>,
113        consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
114        header_downloader: H,
115        body_downloader: B,
116        evm_config: E,
117        stages_config: StageConfig,
118        prune_modes: PruneModes,
119        era_import_source: Option<EraImportSource>,
120    ) -> Self {
121        Self {
122            online: OnlineStages::new(
123                provider,
124                tip,
125                header_downloader,
126                body_downloader,
127                stages_config.clone(),
128                era_import_source,
129            ),
130            evm_config,
131            consensus,
132            stages_config,
133            prune_modes,
134        }
135    }
136}
137
138impl<P, H, B, E> DefaultStages<P, H, B, E>
139where
140    E: ConfigureEvm,
141    H: HeaderDownloader,
142    B: BodyDownloader,
143{
144    /// Appends the default offline stages and default finish stage to the given builder.
145    pub fn add_offline_stages<Provider>(
146        default_offline: StageSetBuilder<Provider>,
147        evm_config: E,
148        consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
149        stages_config: StageConfig,
150        prune_modes: PruneModes,
151    ) -> StageSetBuilder<Provider>
152    where
153        OfflineStages<E>: StageSet<Provider>,
154    {
155        StageSetBuilder::default()
156            .add_set(default_offline)
157            .add_set(OfflineStages::new(evm_config, consensus, stages_config, prune_modes))
158            .add_stage(FinishStage)
159    }
160}
161
162impl<P, H, B, E, Provider> StageSet<Provider> for DefaultStages<P, H, B, E>
163where
164    P: HeaderSyncGapProvider + 'static,
165    H: HeaderDownloader + 'static,
166    B: BodyDownloader + 'static,
167    E: ConfigureEvm,
168    OnlineStages<P, H, B>: StageSet<Provider>,
169    OfflineStages<E>: StageSet<Provider>,
170{
171    fn builder(self) -> StageSetBuilder<Provider> {
172        Self::add_offline_stages(
173            self.online.builder(),
174            self.evm_config,
175            self.consensus,
176            self.stages_config.clone(),
177            self.prune_modes,
178        )
179    }
180}
181
182/// A set containing all stages that require network access by default.
183///
184/// These stages *can* be run without network access if the specified downloaders are
185/// themselves offline.
186#[derive(Debug)]
187pub struct OnlineStages<Provider, H, B>
188where
189    H: HeaderDownloader,
190    B: BodyDownloader,
191{
192    /// Sync gap provider for the headers stage.
193    provider: Provider,
194    /// The tip for the headers stage.
195    tip: watch::Receiver<B256>,
196
197    /// The block header downloader
198    header_downloader: H,
199    /// The block body downloader
200    body_downloader: B,
201    /// Configuration for each stage in the pipeline
202    stages_config: StageConfig,
203    /// Optional source of ERA1 files. The `EraStage` does nothing unless this is specified.
204    era_import_source: Option<EraImportSource>,
205}
206
207impl<Provider, H, B> OnlineStages<Provider, H, B>
208where
209    H: HeaderDownloader,
210    B: BodyDownloader,
211{
212    /// Create a new set of online stages with default values.
213    pub const fn new(
214        provider: Provider,
215        tip: watch::Receiver<B256>,
216        header_downloader: H,
217        body_downloader: B,
218        stages_config: StageConfig,
219        era_import_source: Option<EraImportSource>,
220    ) -> Self {
221        Self { provider, tip, header_downloader, body_downloader, stages_config, era_import_source }
222    }
223}
224
225impl<P, H, B> OnlineStages<P, H, B>
226where
227    P: HeaderSyncGapProvider + 'static,
228    H: HeaderDownloader<Header = <B::Block as Block>::Header> + 'static,
229    B: BodyDownloader + 'static,
230{
231    /// Create a new builder using the given headers stage.
232    pub fn builder_with_headers<Provider>(
233        headers: HeaderStage<P, H>,
234        body_downloader: B,
235    ) -> StageSetBuilder<Provider>
236    where
237        HeaderStage<P, H>: Stage<Provider>,
238        BodyStage<B>: Stage<Provider>,
239    {
240        StageSetBuilder::default().add_stage(headers).add_stage(BodyStage::new(body_downloader))
241    }
242
243    /// Create a new builder using the given bodies stage.
244    pub fn builder_with_bodies<Provider>(
245        bodies: BodyStage<B>,
246        provider: P,
247        tip: watch::Receiver<B256>,
248        header_downloader: H,
249        stages_config: StageConfig,
250    ) -> StageSetBuilder<Provider>
251    where
252        BodyStage<B>: Stage<Provider>,
253        HeaderStage<P, H>: Stage<Provider>,
254    {
255        StageSetBuilder::default()
256            .add_stage(HeaderStage::new(provider, header_downloader, tip, stages_config.etl))
257            .add_stage(bodies)
258    }
259}
260
261impl<Provider, P, H, B> StageSet<Provider> for OnlineStages<P, H, B>
262where
263    P: HeaderSyncGapProvider + 'static,
264    H: HeaderDownloader<Header = <B::Block as Block>::Header> + 'static,
265    B: BodyDownloader + 'static,
266    HeaderStage<P, H>: Stage<Provider>,
267    BodyStage<B>: Stage<Provider>,
268    EraStage<<B::Block as Block>::Header, <B::Block as Block>::Body, EraImportSource>:
269        Stage<Provider>,
270{
271    fn builder(self) -> StageSetBuilder<Provider> {
272        StageSetBuilder::default()
273            .add_stage(EraStage::new(self.era_import_source, self.stages_config.etl.clone()))
274            .add_stage(HeaderStage::new(
275                self.provider,
276                self.header_downloader,
277                self.tip,
278                self.stages_config.etl.clone(),
279            ))
280            .add_stage(BodyStage::new(self.body_downloader))
281    }
282}
283
284/// A set containing all stages that do not require network access.
285///
286/// A combination of (in order)
287///
288/// - [`ExecutionStages`]
289/// - [`PruneSenderRecoveryStage`]
290/// - [`HashingStages`]
291/// - [`HistoryIndexingStages`]
292/// - [`PruneStage`]
293#[derive(Debug)]
294#[non_exhaustive]
295pub struct OfflineStages<E: ConfigureEvm> {
296    /// Executor factory needs for execution stage
297    evm_config: E,
298    /// Consensus instance for validating blocks.
299    consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
300    /// Configuration for each stage in the pipeline
301    stages_config: StageConfig,
302    /// Prune configuration for every segment that can be pruned
303    prune_modes: PruneModes,
304}
305
306impl<E: ConfigureEvm> OfflineStages<E> {
307    /// Create a new set of offline stages with default values.
308    pub const fn new(
309        evm_config: E,
310        consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
311        stages_config: StageConfig,
312        prune_modes: PruneModes,
313    ) -> Self {
314        Self { evm_config, consensus, stages_config, prune_modes }
315    }
316}
317
318impl<E, Provider> StageSet<Provider> for OfflineStages<E>
319where
320    E: ConfigureEvm,
321    ExecutionStages<E>: StageSet<Provider>,
322    PruneSenderRecoveryStage: Stage<Provider>,
323    HashingStages: StageSet<Provider>,
324    HistoryIndexingStages: StageSet<Provider>,
325    PruneStage: Stage<Provider>,
326{
327    fn builder(self) -> StageSetBuilder<Provider> {
328        ExecutionStages::new(self.evm_config, self.consensus, self.stages_config.clone())
329            .builder()
330            // If sender recovery prune mode is set, add the prune sender recovery stage.
331            .add_stage_opt(self.prune_modes.sender_recovery.map(|prune_mode| {
332                PruneSenderRecoveryStage::new(prune_mode, self.stages_config.prune.commit_threshold)
333            }))
334            .add_set(HashingStages { stages_config: self.stages_config.clone() })
335            .add_set(HistoryIndexingStages {
336                stages_config: self.stages_config.clone(),
337                prune_modes: self.prune_modes.clone(),
338            })
339            // If any prune modes are set, add the prune stage.
340            .add_stage_opt(self.prune_modes.is_empty().not().then(|| {
341                // Prune stage should be added after all hashing stages, because otherwise it will
342                // delete
343                PruneStage::new(self.prune_modes.clone(), self.stages_config.prune.commit_threshold)
344            }))
345    }
346}
347
348/// A set containing all stages that are required to execute pre-existing block data.
349#[derive(Debug)]
350#[non_exhaustive]
351pub struct ExecutionStages<E: ConfigureEvm> {
352    /// Executor factory that will create executors.
353    evm_config: E,
354    /// Consensus instance for validating blocks.
355    consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
356    /// Configuration for each stage in the pipeline
357    stages_config: StageConfig,
358}
359
360impl<E: ConfigureEvm> ExecutionStages<E> {
361    /// Create a new set of execution stages with default values.
362    pub const fn new(
363        executor_provider: E,
364        consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
365        stages_config: StageConfig,
366    ) -> Self {
367        Self { evm_config: executor_provider, consensus, stages_config }
368    }
369}
370
371impl<E, Provider> StageSet<Provider> for ExecutionStages<E>
372where
373    E: ConfigureEvm + 'static,
374    SenderRecoveryStage: Stage<Provider>,
375    ExecutionStage<E>: Stage<Provider>,
376{
377    fn builder(self) -> StageSetBuilder<Provider> {
378        StageSetBuilder::default()
379            .add_stage(SenderRecoveryStage::new(self.stages_config.sender_recovery))
380            .add_stage(ExecutionStage::from_config(
381                self.evm_config,
382                self.consensus,
383                self.stages_config.execution,
384                self.stages_config.execution_external_clean_threshold(),
385            ))
386    }
387}
388
389/// A set containing all stages that hash account state.
390#[derive(Debug, Default)]
391#[non_exhaustive]
392pub struct HashingStages {
393    /// Configuration for each stage in the pipeline
394    stages_config: StageConfig,
395}
396
397impl<Provider> StageSet<Provider> for HashingStages
398where
399    MerkleStage: Stage<Provider>,
400    AccountHashingStage: Stage<Provider>,
401    StorageHashingStage: Stage<Provider>,
402{
403    fn builder(self) -> StageSetBuilder<Provider> {
404        StageSetBuilder::default()
405            .add_stage(MerkleStage::default_unwind())
406            .add_stage(AccountHashingStage::new(
407                self.stages_config.account_hashing,
408                self.stages_config.etl.clone(),
409            ))
410            .add_stage(StorageHashingStage::new(
411                self.stages_config.storage_hashing,
412                self.stages_config.etl.clone(),
413            ))
414            .add_stage(MerkleStage::new_execution(
415                self.stages_config.merkle.rebuild_threshold,
416                self.stages_config.merkle.incremental_threshold,
417            ))
418    }
419}
420
421/// A set containing all stages that do additional indexing for historical state.
422#[derive(Debug, Default)]
423#[non_exhaustive]
424pub struct HistoryIndexingStages {
425    /// Configuration for each stage in the pipeline
426    stages_config: StageConfig,
427    /// Prune configuration for every segment that can be pruned
428    prune_modes: PruneModes,
429}
430
431impl<Provider> StageSet<Provider> for HistoryIndexingStages
432where
433    TransactionLookupStage: Stage<Provider>,
434    IndexStorageHistoryStage: Stage<Provider>,
435    IndexAccountHistoryStage: Stage<Provider>,
436{
437    fn builder(self) -> StageSetBuilder<Provider> {
438        StageSetBuilder::default()
439            .add_stage(TransactionLookupStage::new(
440                self.stages_config.transaction_lookup,
441                self.stages_config.etl.clone(),
442                self.prune_modes.transaction_lookup,
443            ))
444            .add_stage(IndexStorageHistoryStage::new(
445                self.stages_config.index_storage_history,
446                self.stages_config.etl.clone(),
447                self.prune_modes.storage_history,
448            ))
449            .add_stage(IndexAccountHistoryStage::new(
450                self.stages_config.index_account_history,
451                self.stages_config.etl.clone(),
452                self.prune_modes.account_history,
453            ))
454    }
455}