reth_stages/
sets.rs

1//! Built-in [`StageSet`]s.
2//!
3//! The easiest set to use is [`DefaultStages`], which provides all stages required to run an
4//! instance of reth.
5//!
6//! It is also possible to run parts of reth standalone given the required data is present in
7//! the environment, such as [`ExecutionStages`] or [`HashingStages`].
8//!
9//!
10//! # Examples
11//!
12//! ```no_run
13//! # use reth_stages::Pipeline;
14//! # use reth_stages::sets::{OfflineStages};
15//! # use reth_chainspec::MAINNET;
16//! # use reth_prune_types::PruneModes;
17//! # use reth_evm_ethereum::EthEvmConfig;
18//! # use reth_evm::ConfigureEvm;
19//! # use reth_provider::StaticFileProviderFactory;
20//! # use reth_provider::test_utils::{create_test_provider_factory, MockNodeTypesWithDB};
21//! # use reth_static_file::StaticFileProducer;
22//! # use reth_config::config::StageConfig;
23//! # use reth_ethereum_primitives::EthPrimitives;
24//! # use std::sync::Arc;
25//! # use reth_consensus::{FullConsensus, ConsensusError};
26//!
27//! # fn create(exec: impl ConfigureEvm<Primitives = EthPrimitives> + 'static, consensus: impl FullConsensus<EthPrimitives, Error = ConsensusError> + 'static) {
28//!
29//! let provider_factory = create_test_provider_factory();
30//! let static_file_producer =
31//!     StaticFileProducer::new(provider_factory.clone(), PruneModes::default());
32//! // Build a pipeline with all offline stages.
33//! let pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
34//!     .add_stages(OfflineStages::new(exec, Arc::new(consensus), StageConfig::default(), PruneModes::default()))
35//!     .build(provider_factory, static_file_producer);
36//!
37//! # }
38//! ```
39use crate::{
40    stages::{
41        AccountHashingStage, BodyStage, EraImportSource, EraStage, ExecutionStage, FinishStage,
42        HeaderStage, IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleChangeSets,
43        MerkleStage, PruneSenderRecoveryStage, PruneStage, SenderRecoveryStage,
44        StorageHashingStage, TransactionLookupStage,
45    },
46    StageSet, StageSetBuilder,
47};
48use alloy_primitives::B256;
49use reth_config::config::StageConfig;
50use reth_consensus::{ConsensusError, FullConsensus};
51use reth_evm::ConfigureEvm;
52use reth_network_p2p::{bodies::downloader::BodyDownloader, headers::downloader::HeaderDownloader};
53use reth_primitives_traits::{Block, NodePrimitives};
54use reth_provider::HeaderSyncGapProvider;
55use reth_prune_types::PruneModes;
56use reth_stages_api::Stage;
57use std::sync::Arc;
58use tokio::sync::watch;
59
60/// A set containing all stages to run a fully syncing instance of reth.
61///
62/// A combination of (in order)
63///
64/// - [`OnlineStages`]
65/// - [`OfflineStages`]
66/// - [`FinishStage`]
67///
68/// This expands to the following series of stages:
69/// - [`HeaderStage`]
70/// - [`BodyStage`]
71/// - [`SenderRecoveryStage`]
72/// - [`ExecutionStage`]
73/// - [`PruneSenderRecoveryStage`] (execute)
74/// - [`MerkleStage`] (unwind)
75/// - [`AccountHashingStage`]
76/// - [`StorageHashingStage`]
77/// - [`MerkleStage`] (execute)
78/// - [`MerkleChangeSets`]
79/// - [`TransactionLookupStage`]
80/// - [`IndexStorageHistoryStage`]
81/// - [`IndexAccountHistoryStage`]
82/// - [`PruneStage`] (execute)
83/// - [`FinishStage`]
84#[derive(Debug)]
85pub struct DefaultStages<Provider, H, B, E>
86where
87    H: HeaderDownloader,
88    B: BodyDownloader,
89    E: ConfigureEvm,
90{
91    /// Configuration for the online stages
92    online: OnlineStages<Provider, H, B>,
93    /// Executor factory needs for execution stage
94    evm_config: E,
95    /// Consensus instance
96    consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
97    /// Configuration for each stage in the pipeline
98    stages_config: StageConfig,
99    /// Prune configuration for every segment that can be pruned
100    prune_modes: PruneModes,
101}
102
103impl<Provider, H, B, E> DefaultStages<Provider, H, B, E>
104where
105    H: HeaderDownloader,
106    B: BodyDownloader,
107    E: ConfigureEvm<Primitives: NodePrimitives<BlockHeader = H::Header, Block = B::Block>>,
108{
109    /// Create a new set of default stages with default values.
110    #[expect(clippy::too_many_arguments)]
111    pub fn new(
112        provider: Provider,
113        tip: watch::Receiver<B256>,
114        consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
115        header_downloader: H,
116        body_downloader: B,
117        evm_config: E,
118        stages_config: StageConfig,
119        prune_modes: PruneModes,
120        era_import_source: Option<EraImportSource>,
121    ) -> Self {
122        Self {
123            online: OnlineStages::new(
124                provider,
125                tip,
126                header_downloader,
127                body_downloader,
128                stages_config.clone(),
129                era_import_source,
130            ),
131            evm_config,
132            consensus,
133            stages_config,
134            prune_modes,
135        }
136    }
137}
138
139impl<P, H, B, E> DefaultStages<P, H, B, E>
140where
141    E: ConfigureEvm,
142    H: HeaderDownloader,
143    B: BodyDownloader,
144{
145    /// Appends the default offline stages and default finish stage to the given builder.
146    pub fn add_offline_stages<Provider>(
147        default_offline: StageSetBuilder<Provider>,
148        evm_config: E,
149        consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
150        stages_config: StageConfig,
151        prune_modes: PruneModes,
152    ) -> StageSetBuilder<Provider>
153    where
154        OfflineStages<E>: StageSet<Provider>,
155    {
156        StageSetBuilder::default()
157            .add_set(default_offline)
158            .add_set(OfflineStages::new(evm_config, consensus, stages_config, prune_modes))
159            .add_stage(FinishStage)
160    }
161}
162
163impl<P, H, B, E, Provider> StageSet<Provider> for DefaultStages<P, H, B, E>
164where
165    P: HeaderSyncGapProvider + 'static,
166    H: HeaderDownloader + 'static,
167    B: BodyDownloader + 'static,
168    E: ConfigureEvm,
169    OnlineStages<P, H, B>: StageSet<Provider>,
170    OfflineStages<E>: StageSet<Provider>,
171{
172    fn builder(self) -> StageSetBuilder<Provider> {
173        Self::add_offline_stages(
174            self.online.builder(),
175            self.evm_config,
176            self.consensus,
177            self.stages_config.clone(),
178            self.prune_modes,
179        )
180    }
181}
182
183/// A set containing all stages that require network access by default.
184///
185/// These stages *can* be run without network access if the specified downloaders are
186/// themselves offline.
187#[derive(Debug)]
188pub struct OnlineStages<Provider, H, B>
189where
190    H: HeaderDownloader,
191    B: BodyDownloader,
192{
193    /// Sync gap provider for the headers stage.
194    provider: Provider,
195    /// The tip for the headers stage.
196    tip: watch::Receiver<B256>,
197
198    /// The block header downloader
199    header_downloader: H,
200    /// The block body downloader
201    body_downloader: B,
202    /// Configuration for each stage in the pipeline
203    stages_config: StageConfig,
204    /// Optional source of ERA1 files. The `EraStage` does nothing unless this is specified.
205    era_import_source: Option<EraImportSource>,
206}
207
208impl<Provider, H, B> OnlineStages<Provider, H, B>
209where
210    H: HeaderDownloader,
211    B: BodyDownloader,
212{
213    /// Create a new set of online stages with default values.
214    pub const fn new(
215        provider: Provider,
216        tip: watch::Receiver<B256>,
217        header_downloader: H,
218        body_downloader: B,
219        stages_config: StageConfig,
220        era_import_source: Option<EraImportSource>,
221    ) -> Self {
222        Self { provider, tip, header_downloader, body_downloader, stages_config, era_import_source }
223    }
224}
225
226impl<P, H, B> OnlineStages<P, H, B>
227where
228    P: HeaderSyncGapProvider + 'static,
229    H: HeaderDownloader<Header = <B::Block as Block>::Header> + 'static,
230    B: BodyDownloader + 'static,
231{
232    /// Create a new builder using the given headers stage.
233    pub fn builder_with_headers<Provider>(
234        headers: HeaderStage<P, H>,
235        body_downloader: B,
236    ) -> StageSetBuilder<Provider>
237    where
238        HeaderStage<P, H>: Stage<Provider>,
239        BodyStage<B>: Stage<Provider>,
240    {
241        StageSetBuilder::default().add_stage(headers).add_stage(BodyStage::new(body_downloader))
242    }
243
244    /// Create a new builder using the given bodies stage.
245    pub fn builder_with_bodies<Provider>(
246        bodies: BodyStage<B>,
247        provider: P,
248        tip: watch::Receiver<B256>,
249        header_downloader: H,
250        stages_config: StageConfig,
251    ) -> StageSetBuilder<Provider>
252    where
253        BodyStage<B>: Stage<Provider>,
254        HeaderStage<P, H>: Stage<Provider>,
255    {
256        StageSetBuilder::default()
257            .add_stage(HeaderStage::new(provider, header_downloader, tip, stages_config.etl))
258            .add_stage(bodies)
259    }
260}
261
262impl<Provider, P, H, B> StageSet<Provider> for OnlineStages<P, H, B>
263where
264    P: HeaderSyncGapProvider + 'static,
265    H: HeaderDownloader<Header = <B::Block as Block>::Header> + 'static,
266    B: BodyDownloader + 'static,
267    HeaderStage<P, H>: Stage<Provider>,
268    BodyStage<B>: Stage<Provider>,
269    EraStage<<B::Block as Block>::Header, <B::Block as Block>::Body, EraImportSource>:
270        Stage<Provider>,
271{
272    fn builder(self) -> StageSetBuilder<Provider> {
273        let mut builder = StageSetBuilder::default();
274
275        if self.era_import_source.is_some() {
276            builder = builder
277                .add_stage(EraStage::new(self.era_import_source, self.stages_config.etl.clone()));
278        }
279
280        builder
281            .add_stage(HeaderStage::new(
282                self.provider,
283                self.header_downloader,
284                self.tip,
285                self.stages_config.etl.clone(),
286            ))
287            .add_stage(BodyStage::new(self.body_downloader))
288    }
289}
290
291/// A set containing all stages that do not require network access.
292///
293/// A combination of (in order)
294///
295/// - [`ExecutionStages`]
296/// - [`PruneSenderRecoveryStage`]
297/// - [`HashingStages`]
298/// - [`HistoryIndexingStages`]
299/// - [`PruneStage`]
300#[derive(Debug)]
301#[non_exhaustive]
302pub struct OfflineStages<E: ConfigureEvm> {
303    /// Executor factory needs for execution stage
304    evm_config: E,
305    /// Consensus instance for validating blocks.
306    consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
307    /// Configuration for each stage in the pipeline
308    stages_config: StageConfig,
309    /// Prune configuration for every segment that can be pruned
310    prune_modes: PruneModes,
311}
312
313impl<E: ConfigureEvm> OfflineStages<E> {
314    /// Create a new set of offline stages with default values.
315    pub const fn new(
316        evm_config: E,
317        consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
318        stages_config: StageConfig,
319        prune_modes: PruneModes,
320    ) -> Self {
321        Self { evm_config, consensus, stages_config, prune_modes }
322    }
323}
324
325impl<E, Provider> StageSet<Provider> for OfflineStages<E>
326where
327    E: ConfigureEvm,
328    ExecutionStages<E>: StageSet<Provider>,
329    PruneSenderRecoveryStage: Stage<Provider>,
330    HashingStages: StageSet<Provider>,
331    HistoryIndexingStages: StageSet<Provider>,
332    PruneStage: Stage<Provider>,
333{
334    fn builder(self) -> StageSetBuilder<Provider> {
335        ExecutionStages::new(self.evm_config, self.consensus, self.stages_config.clone())
336            .builder()
337            // If sender recovery prune mode is set, add the prune sender recovery stage.
338            .add_stage_opt(self.prune_modes.sender_recovery.map(|prune_mode| {
339                PruneSenderRecoveryStage::new(prune_mode, self.stages_config.prune.commit_threshold)
340            }))
341            .add_set(HashingStages { stages_config: self.stages_config.clone() })
342            .add_set(HistoryIndexingStages {
343                stages_config: self.stages_config.clone(),
344                prune_modes: self.prune_modes.clone(),
345            })
346            // Prune stage should be added after all hashing stages, because otherwise it will
347            // delete
348            .add_stage(PruneStage::new(
349                self.prune_modes.clone(),
350                self.stages_config.prune.commit_threshold,
351            ))
352    }
353}
354
355/// A set containing all stages that are required to execute pre-existing block data.
356#[derive(Debug)]
357#[non_exhaustive]
358pub struct ExecutionStages<E: ConfigureEvm> {
359    /// Executor factory that will create executors.
360    evm_config: E,
361    /// Consensus instance for validating blocks.
362    consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
363    /// Configuration for each stage in the pipeline
364    stages_config: StageConfig,
365}
366
367impl<E: ConfigureEvm> ExecutionStages<E> {
368    /// Create a new set of execution stages with default values.
369    pub const fn new(
370        executor_provider: E,
371        consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
372        stages_config: StageConfig,
373    ) -> Self {
374        Self { evm_config: executor_provider, consensus, stages_config }
375    }
376}
377
378impl<E, Provider> StageSet<Provider> for ExecutionStages<E>
379where
380    E: ConfigureEvm + 'static,
381    SenderRecoveryStage: Stage<Provider>,
382    ExecutionStage<E>: Stage<Provider>,
383{
384    fn builder(self) -> StageSetBuilder<Provider> {
385        StageSetBuilder::default()
386            .add_stage(SenderRecoveryStage::new(self.stages_config.sender_recovery))
387            .add_stage(ExecutionStage::from_config(
388                self.evm_config,
389                self.consensus,
390                self.stages_config.execution,
391                self.stages_config.execution_external_clean_threshold(),
392            ))
393    }
394}
395
396/// A set containing all stages that hash account state.
397///
398/// This includes:
399/// - [`MerkleStage`] (unwind)
400/// - [`AccountHashingStage`]
401/// - [`StorageHashingStage`]
402/// - [`MerkleStage`] (execute)
403/// - [`MerkleChangeSets`]
404#[derive(Debug, Default)]
405#[non_exhaustive]
406pub struct HashingStages {
407    /// Configuration for each stage in the pipeline
408    stages_config: StageConfig,
409}
410
411impl<Provider> StageSet<Provider> for HashingStages
412where
413    MerkleStage: Stage<Provider>,
414    AccountHashingStage: Stage<Provider>,
415    StorageHashingStage: Stage<Provider>,
416    MerkleChangeSets: Stage<Provider>,
417{
418    fn builder(self) -> StageSetBuilder<Provider> {
419        StageSetBuilder::default()
420            .add_stage(MerkleStage::default_unwind())
421            .add_stage(AccountHashingStage::new(
422                self.stages_config.account_hashing,
423                self.stages_config.etl.clone(),
424            ))
425            .add_stage(StorageHashingStage::new(
426                self.stages_config.storage_hashing,
427                self.stages_config.etl.clone(),
428            ))
429            .add_stage(MerkleStage::new_execution(
430                self.stages_config.merkle.rebuild_threshold,
431                self.stages_config.merkle.incremental_threshold,
432            ))
433            .add_stage(MerkleChangeSets::new())
434    }
435}
436
437/// A set containing all stages that do additional indexing for historical state.
438#[derive(Debug, Default)]
439#[non_exhaustive]
440pub struct HistoryIndexingStages {
441    /// Configuration for each stage in the pipeline
442    stages_config: StageConfig,
443    /// Prune configuration for every segment that can be pruned
444    prune_modes: PruneModes,
445}
446
447impl<Provider> StageSet<Provider> for HistoryIndexingStages
448where
449    TransactionLookupStage: Stage<Provider>,
450    IndexStorageHistoryStage: Stage<Provider>,
451    IndexAccountHistoryStage: Stage<Provider>,
452{
453    fn builder(self) -> StageSetBuilder<Provider> {
454        StageSetBuilder::default()
455            .add_stage(TransactionLookupStage::new(
456                self.stages_config.transaction_lookup,
457                self.stages_config.etl.clone(),
458                self.prune_modes.transaction_lookup,
459            ))
460            .add_stage(IndexStorageHistoryStage::new(
461                self.stages_config.index_storage_history,
462                self.stages_config.etl.clone(),
463                self.prune_modes.storage_history,
464            ))
465            .add_stage(IndexAccountHistoryStage::new(
466                self.stages_config.index_account_history,
467                self.stages_config.etl.clone(),
468                self.prune_modes.account_history,
469            ))
470    }
471}