reth_stages/
sets.rs

1//! Built-in [`StageSet`]s.
2//!
3//! The easiest set to use is [`DefaultStages`], which provides all stages required to run an
4//! instance of reth.
5//!
6//! It is also possible to run parts of reth standalone given the required data is present in
7//! the environment, such as [`ExecutionStages`] or [`HashingStages`].
8//!
9//!
10//! # Examples
11//!
12//! ```no_run
13//! # use reth_stages::Pipeline;
14//! # use reth_stages::sets::{OfflineStages};
15//! # use reth_chainspec::MAINNET;
16//! # use reth_prune_types::PruneModes;
17//! # use reth_evm_ethereum::EthEvmConfig;
18//! # use reth_evm::ConfigureEvm;
19//! # use reth_provider::StaticFileProviderFactory;
20//! # use reth_provider::test_utils::{create_test_provider_factory, MockNodeTypesWithDB};
21//! # use reth_static_file::StaticFileProducer;
22//! # use reth_config::config::StageConfig;
23//! # use reth_ethereum_primitives::EthPrimitives;
24//! # use std::sync::Arc;
25//! # use reth_consensus::FullConsensus;
26//!
27//! # fn create(exec: impl ConfigureEvm<Primitives = EthPrimitives> + 'static, consensus: impl FullConsensus<EthPrimitives> + 'static) {
28//!
29//! let provider_factory = create_test_provider_factory();
30//! let static_file_producer =
31//!     StaticFileProducer::new(provider_factory.clone(), PruneModes::default());
32//! // Build a pipeline with all offline stages.
33//! let pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
34//!     .add_stages(OfflineStages::new(exec, Arc::new(consensus), StageConfig::default(), PruneModes::default()))
35//!     .build(provider_factory, static_file_producer);
36//!
37//! # }
38//! ```
39use crate::{
40    stages::{
41        AccountHashingStage, BodyStage, EraImportSource, EraStage, ExecutionStage, FinishStage,
42        HeaderStage, IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleChangeSets,
43        MerkleStage, PruneSenderRecoveryStage, PruneStage, SenderRecoveryStage,
44        StorageHashingStage, TransactionLookupStage,
45    },
46    StageSet, StageSetBuilder,
47};
48use alloy_primitives::B256;
49use reth_config::config::StageConfig;
50use reth_consensus::FullConsensus;
51use reth_evm::ConfigureEvm;
52use reth_network_p2p::{bodies::downloader::BodyDownloader, headers::downloader::HeaderDownloader};
53use reth_primitives_traits::{Block, NodePrimitives};
54use reth_provider::HeaderSyncGapProvider;
55use reth_prune_types::PruneModes;
56use reth_stages_api::Stage;
57use std::sync::Arc;
58use tokio::sync::watch;
59
60/// A set containing all stages to run a fully syncing instance of reth.
61///
62/// A combination of (in order)
63///
64/// - [`OnlineStages`]
65/// - [`OfflineStages`]
66/// - [`FinishStage`]
67///
68/// This expands to the following series of stages:
69/// - [`EraStage`] (optional, for ERA1 import)
70/// - [`HeaderStage`]
71/// - [`BodyStage`]
72/// - [`SenderRecoveryStage`]
73/// - [`ExecutionStage`]
74/// - [`PruneSenderRecoveryStage`] (execute)
75/// - [`MerkleStage`] (unwind)
76/// - [`AccountHashingStage`]
77/// - [`StorageHashingStage`]
78/// - [`MerkleStage`] (execute)
79/// - [`MerkleChangeSets`]
80/// - [`TransactionLookupStage`]
81/// - [`IndexStorageHistoryStage`]
82/// - [`IndexAccountHistoryStage`]
83/// - [`PruneStage`] (execute)
84/// - [`FinishStage`]
85#[derive(Debug)]
86pub struct DefaultStages<Provider, H, B, E>
87where
88    H: HeaderDownloader,
89    B: BodyDownloader,
90    E: ConfigureEvm,
91{
92    /// Configuration for the online stages
93    online: OnlineStages<Provider, H, B>,
94    /// Executor factory needs for execution stage
95    evm_config: E,
96    /// Consensus instance
97    consensus: Arc<dyn FullConsensus<E::Primitives>>,
98    /// Configuration for each stage in the pipeline
99    stages_config: StageConfig,
100    /// Prune configuration for every segment that can be pruned
101    prune_modes: PruneModes,
102}
103
104impl<Provider, H, B, E> DefaultStages<Provider, H, B, E>
105where
106    H: HeaderDownloader,
107    B: BodyDownloader,
108    E: ConfigureEvm<Primitives: NodePrimitives<BlockHeader = H::Header, Block = B::Block>>,
109{
110    /// Create a new set of default stages with default values.
111    #[expect(clippy::too_many_arguments)]
112    pub fn new(
113        provider: Provider,
114        tip: watch::Receiver<B256>,
115        consensus: Arc<dyn FullConsensus<E::Primitives>>,
116        header_downloader: H,
117        body_downloader: B,
118        evm_config: E,
119        stages_config: StageConfig,
120        prune_modes: PruneModes,
121        era_import_source: Option<EraImportSource>,
122    ) -> Self {
123        Self {
124            online: OnlineStages::new(
125                provider,
126                tip,
127                header_downloader,
128                body_downloader,
129                stages_config.clone(),
130                era_import_source,
131            ),
132            evm_config,
133            consensus,
134            stages_config,
135            prune_modes,
136        }
137    }
138}
139
140impl<P, H, B, E> DefaultStages<P, H, B, E>
141where
142    E: ConfigureEvm,
143    H: HeaderDownloader,
144    B: BodyDownloader,
145{
146    /// Appends the default offline stages and default finish stage to the given builder.
147    pub fn add_offline_stages<Provider>(
148        default_offline: StageSetBuilder<Provider>,
149        evm_config: E,
150        consensus: Arc<dyn FullConsensus<E::Primitives>>,
151        stages_config: StageConfig,
152        prune_modes: PruneModes,
153    ) -> StageSetBuilder<Provider>
154    where
155        OfflineStages<E>: StageSet<Provider>,
156    {
157        StageSetBuilder::default()
158            .add_set(default_offline)
159            .add_set(OfflineStages::new(evm_config, consensus, stages_config, prune_modes))
160            .add_stage(FinishStage)
161    }
162}
163
164impl<P, H, B, E, Provider> StageSet<Provider> for DefaultStages<P, H, B, E>
165where
166    P: HeaderSyncGapProvider + 'static,
167    H: HeaderDownloader + 'static,
168    B: BodyDownloader + 'static,
169    E: ConfigureEvm,
170    OnlineStages<P, H, B>: StageSet<Provider>,
171    OfflineStages<E>: StageSet<Provider>,
172{
173    fn builder(self) -> StageSetBuilder<Provider> {
174        Self::add_offline_stages(
175            self.online.builder(),
176            self.evm_config,
177            self.consensus,
178            self.stages_config.clone(),
179            self.prune_modes,
180        )
181    }
182}
183
184/// A set containing all stages that require network access by default.
185///
186/// These stages *can* be run without network access if the specified downloaders are
187/// themselves offline.
188#[derive(Debug)]
189pub struct OnlineStages<Provider, H, B>
190where
191    H: HeaderDownloader,
192    B: BodyDownloader,
193{
194    /// Sync gap provider for the headers stage.
195    provider: Provider,
196    /// The tip for the headers stage.
197    tip: watch::Receiver<B256>,
198
199    /// The block header downloader
200    header_downloader: H,
201    /// The block body downloader
202    body_downloader: B,
203    /// Configuration for each stage in the pipeline
204    stages_config: StageConfig,
205    /// Optional source of ERA1 files. The `EraStage` does nothing unless this is specified.
206    era_import_source: Option<EraImportSource>,
207}
208
209impl<Provider, H, B> OnlineStages<Provider, H, B>
210where
211    H: HeaderDownloader,
212    B: BodyDownloader,
213{
214    /// Create a new set of online stages with default values.
215    pub const fn new(
216        provider: Provider,
217        tip: watch::Receiver<B256>,
218        header_downloader: H,
219        body_downloader: B,
220        stages_config: StageConfig,
221        era_import_source: Option<EraImportSource>,
222    ) -> Self {
223        Self { provider, tip, header_downloader, body_downloader, stages_config, era_import_source }
224    }
225}
226
227impl<P, H, B> OnlineStages<P, H, B>
228where
229    P: HeaderSyncGapProvider + 'static,
230    H: HeaderDownloader<Header = <B::Block as Block>::Header> + 'static,
231    B: BodyDownloader + 'static,
232{
233    /// Create a new builder using the given headers stage.
234    pub fn builder_with_headers<Provider>(
235        headers: HeaderStage<P, H>,
236        body_downloader: B,
237    ) -> StageSetBuilder<Provider>
238    where
239        HeaderStage<P, H>: Stage<Provider>,
240        BodyStage<B>: Stage<Provider>,
241    {
242        StageSetBuilder::default().add_stage(headers).add_stage(BodyStage::new(body_downloader))
243    }
244
245    /// Create a new builder using the given bodies stage.
246    pub fn builder_with_bodies<Provider>(
247        bodies: BodyStage<B>,
248        provider: P,
249        tip: watch::Receiver<B256>,
250        header_downloader: H,
251        stages_config: StageConfig,
252    ) -> StageSetBuilder<Provider>
253    where
254        BodyStage<B>: Stage<Provider>,
255        HeaderStage<P, H>: Stage<Provider>,
256    {
257        StageSetBuilder::default()
258            .add_stage(HeaderStage::new(provider, header_downloader, tip, stages_config.etl))
259            .add_stage(bodies)
260    }
261}
262
263impl<Provider, P, H, B> StageSet<Provider> for OnlineStages<P, H, B>
264where
265    P: HeaderSyncGapProvider + 'static,
266    H: HeaderDownloader<Header = <B::Block as Block>::Header> + 'static,
267    B: BodyDownloader + 'static,
268    HeaderStage<P, H>: Stage<Provider>,
269    BodyStage<B>: Stage<Provider>,
270    EraStage<<B::Block as Block>::Header, <B::Block as Block>::Body, EraImportSource>:
271        Stage<Provider>,
272{
273    fn builder(self) -> StageSetBuilder<Provider> {
274        let mut builder = StageSetBuilder::default();
275
276        if self.era_import_source.is_some() {
277            builder = builder
278                .add_stage(EraStage::new(self.era_import_source, self.stages_config.etl.clone()));
279        }
280
281        builder
282            .add_stage(HeaderStage::new(
283                self.provider,
284                self.header_downloader,
285                self.tip,
286                self.stages_config.etl.clone(),
287            ))
288            .add_stage(BodyStage::new(self.body_downloader))
289    }
290}
291
292/// A set containing all stages that do not require network access.
293///
294/// A combination of (in order)
295///
296/// - [`ExecutionStages`]
297/// - [`PruneSenderRecoveryStage`]
298/// - [`HashingStages`]
299/// - [`HistoryIndexingStages`]
300/// - [`PruneStage`]
301#[derive(Debug)]
302#[non_exhaustive]
303pub struct OfflineStages<E: ConfigureEvm> {
304    /// Executor factory needs for execution stage
305    evm_config: E,
306    /// Consensus instance for validating blocks.
307    consensus: Arc<dyn FullConsensus<E::Primitives>>,
308    /// Configuration for each stage in the pipeline
309    stages_config: StageConfig,
310    /// Prune configuration for every segment that can be pruned
311    prune_modes: PruneModes,
312}
313
314impl<E: ConfigureEvm> OfflineStages<E> {
315    /// Create a new set of offline stages with default values.
316    pub const fn new(
317        evm_config: E,
318        consensus: Arc<dyn FullConsensus<E::Primitives>>,
319        stages_config: StageConfig,
320        prune_modes: PruneModes,
321    ) -> Self {
322        Self { evm_config, consensus, stages_config, prune_modes }
323    }
324}
325
326impl<E, Provider> StageSet<Provider> for OfflineStages<E>
327where
328    E: ConfigureEvm,
329    ExecutionStages<E>: StageSet<Provider>,
330    PruneSenderRecoveryStage: Stage<Provider>,
331    HashingStages: StageSet<Provider>,
332    HistoryIndexingStages: StageSet<Provider>,
333    PruneStage: Stage<Provider>,
334{
335    fn builder(self) -> StageSetBuilder<Provider> {
336        ExecutionStages::new(self.evm_config, self.consensus, self.stages_config.clone())
337            .builder()
338            // If sender recovery prune mode is set, add the prune sender recovery stage.
339            .add_stage_opt(self.prune_modes.sender_recovery.map(|prune_mode| {
340                PruneSenderRecoveryStage::new(prune_mode, self.stages_config.prune.commit_threshold)
341            }))
342            .add_set(HashingStages { stages_config: self.stages_config.clone() })
343            .add_set(HistoryIndexingStages {
344                stages_config: self.stages_config.clone(),
345                prune_modes: self.prune_modes.clone(),
346            })
347            // Prune stage should be added after all hashing stages, because otherwise it will
348            // delete
349            .add_stage(PruneStage::new(
350                self.prune_modes.clone(),
351                self.stages_config.prune.commit_threshold,
352            ))
353    }
354}
355
356/// A set containing all stages that are required to execute pre-existing block data.
357#[derive(Debug)]
358#[non_exhaustive]
359pub struct ExecutionStages<E: ConfigureEvm> {
360    /// Executor factory that will create executors.
361    evm_config: E,
362    /// Consensus instance for validating blocks.
363    consensus: Arc<dyn FullConsensus<E::Primitives>>,
364    /// Configuration for each stage in the pipeline
365    stages_config: StageConfig,
366}
367
368impl<E: ConfigureEvm> ExecutionStages<E> {
369    /// Create a new set of execution stages with default values.
370    pub const fn new(
371        executor_provider: E,
372        consensus: Arc<dyn FullConsensus<E::Primitives>>,
373        stages_config: StageConfig,
374    ) -> Self {
375        Self { evm_config: executor_provider, consensus, stages_config }
376    }
377}
378
379impl<E, Provider> StageSet<Provider> for ExecutionStages<E>
380where
381    E: ConfigureEvm + 'static,
382    SenderRecoveryStage: Stage<Provider>,
383    ExecutionStage<E>: Stage<Provider>,
384{
385    fn builder(self) -> StageSetBuilder<Provider> {
386        StageSetBuilder::default()
387            .add_stage(SenderRecoveryStage::new(self.stages_config.sender_recovery))
388            .add_stage(ExecutionStage::from_config(
389                self.evm_config,
390                self.consensus,
391                self.stages_config.execution,
392                self.stages_config.execution_external_clean_threshold(),
393            ))
394    }
395}
396
397/// A set containing all stages that hash account state.
398///
399/// This includes:
400/// - [`MerkleStage`] (unwind)
401/// - [`AccountHashingStage`]
402/// - [`StorageHashingStage`]
403/// - [`MerkleStage`] (execute)
404/// - [`MerkleChangeSets`]
405#[derive(Debug, Default)]
406#[non_exhaustive]
407pub struct HashingStages {
408    /// Configuration for each stage in the pipeline
409    stages_config: StageConfig,
410}
411
412impl<Provider> StageSet<Provider> for HashingStages
413where
414    MerkleStage: Stage<Provider>,
415    AccountHashingStage: Stage<Provider>,
416    StorageHashingStage: Stage<Provider>,
417    MerkleChangeSets: Stage<Provider>,
418{
419    fn builder(self) -> StageSetBuilder<Provider> {
420        StageSetBuilder::default()
421            .add_stage(MerkleStage::default_unwind())
422            .add_stage(AccountHashingStage::new(
423                self.stages_config.account_hashing,
424                self.stages_config.etl.clone(),
425            ))
426            .add_stage(StorageHashingStage::new(
427                self.stages_config.storage_hashing,
428                self.stages_config.etl.clone(),
429            ))
430            .add_stage(MerkleStage::new_execution(
431                self.stages_config.merkle.rebuild_threshold,
432                self.stages_config.merkle.incremental_threshold,
433            ))
434            .add_stage(MerkleChangeSets::new())
435    }
436}
437
438/// A set containing all stages that do additional indexing for historical state.
439#[derive(Debug, Default)]
440#[non_exhaustive]
441pub struct HistoryIndexingStages {
442    /// Configuration for each stage in the pipeline
443    stages_config: StageConfig,
444    /// Prune configuration for every segment that can be pruned
445    prune_modes: PruneModes,
446}
447
448impl<Provider> StageSet<Provider> for HistoryIndexingStages
449where
450    TransactionLookupStage: Stage<Provider>,
451    IndexStorageHistoryStage: Stage<Provider>,
452    IndexAccountHistoryStage: Stage<Provider>,
453{
454    fn builder(self) -> StageSetBuilder<Provider> {
455        StageSetBuilder::default()
456            .add_stage(TransactionLookupStage::new(
457                self.stages_config.transaction_lookup,
458                self.stages_config.etl.clone(),
459                self.prune_modes.transaction_lookup,
460            ))
461            .add_stage(IndexStorageHistoryStage::new(
462                self.stages_config.index_storage_history,
463                self.stages_config.etl.clone(),
464                self.prune_modes.storage_history,
465            ))
466            .add_stage(IndexAccountHistoryStage::new(
467                self.stages_config.index_account_history,
468                self.stages_config.etl.clone(),
469                self.prune_modes.account_history,
470            ))
471    }
472}