1use crate::{
40 stages::{
41 AccountHashingStage, BodyStage, EraImportSource, EraStage, ExecutionStage, FinishStage,
42 HeaderStage, IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleChangeSets,
43 MerkleStage, PruneSenderRecoveryStage, PruneStage, SenderRecoveryStage,
44 StorageHashingStage, TransactionLookupStage,
45 },
46 StageSet, StageSetBuilder,
47};
48use alloy_primitives::B256;
49use reth_config::config::StageConfig;
50use reth_consensus::{ConsensusError, FullConsensus};
51use reth_evm::ConfigureEvm;
52use reth_network_p2p::{bodies::downloader::BodyDownloader, headers::downloader::HeaderDownloader};
53use reth_primitives_traits::{Block, NodePrimitives};
54use reth_provider::HeaderSyncGapProvider;
55use reth_prune_types::PruneModes;
56use reth_stages_api::Stage;
57use std::sync::Arc;
58use tokio::sync::watch;
59
60#[derive(Debug)]
85pub struct DefaultStages<Provider, H, B, E>
86where
87 H: HeaderDownloader,
88 B: BodyDownloader,
89 E: ConfigureEvm,
90{
91 online: OnlineStages<Provider, H, B>,
93 evm_config: E,
95 consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
97 stages_config: StageConfig,
99 prune_modes: PruneModes,
101}
102
103impl<Provider, H, B, E> DefaultStages<Provider, H, B, E>
104where
105 H: HeaderDownloader,
106 B: BodyDownloader,
107 E: ConfigureEvm<Primitives: NodePrimitives<BlockHeader = H::Header, Block = B::Block>>,
108{
109 #[expect(clippy::too_many_arguments)]
111 pub fn new(
112 provider: Provider,
113 tip: watch::Receiver<B256>,
114 consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
115 header_downloader: H,
116 body_downloader: B,
117 evm_config: E,
118 stages_config: StageConfig,
119 prune_modes: PruneModes,
120 era_import_source: Option<EraImportSource>,
121 ) -> Self {
122 Self {
123 online: OnlineStages::new(
124 provider,
125 tip,
126 header_downloader,
127 body_downloader,
128 stages_config.clone(),
129 era_import_source,
130 ),
131 evm_config,
132 consensus,
133 stages_config,
134 prune_modes,
135 }
136 }
137}
138
139impl<P, H, B, E> DefaultStages<P, H, B, E>
140where
141 E: ConfigureEvm,
142 H: HeaderDownloader,
143 B: BodyDownloader,
144{
145 pub fn add_offline_stages<Provider>(
147 default_offline: StageSetBuilder<Provider>,
148 evm_config: E,
149 consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
150 stages_config: StageConfig,
151 prune_modes: PruneModes,
152 ) -> StageSetBuilder<Provider>
153 where
154 OfflineStages<E>: StageSet<Provider>,
155 {
156 StageSetBuilder::default()
157 .add_set(default_offline)
158 .add_set(OfflineStages::new(evm_config, consensus, stages_config, prune_modes))
159 .add_stage(FinishStage)
160 }
161}
162
163impl<P, H, B, E, Provider> StageSet<Provider> for DefaultStages<P, H, B, E>
164where
165 P: HeaderSyncGapProvider + 'static,
166 H: HeaderDownloader + 'static,
167 B: BodyDownloader + 'static,
168 E: ConfigureEvm,
169 OnlineStages<P, H, B>: StageSet<Provider>,
170 OfflineStages<E>: StageSet<Provider>,
171{
172 fn builder(self) -> StageSetBuilder<Provider> {
173 Self::add_offline_stages(
174 self.online.builder(),
175 self.evm_config,
176 self.consensus,
177 self.stages_config.clone(),
178 self.prune_modes,
179 )
180 }
181}
182
183#[derive(Debug)]
188pub struct OnlineStages<Provider, H, B>
189where
190 H: HeaderDownloader,
191 B: BodyDownloader,
192{
193 provider: Provider,
195 tip: watch::Receiver<B256>,
197
198 header_downloader: H,
200 body_downloader: B,
202 stages_config: StageConfig,
204 era_import_source: Option<EraImportSource>,
206}
207
208impl<Provider, H, B> OnlineStages<Provider, H, B>
209where
210 H: HeaderDownloader,
211 B: BodyDownloader,
212{
213 pub const fn new(
215 provider: Provider,
216 tip: watch::Receiver<B256>,
217 header_downloader: H,
218 body_downloader: B,
219 stages_config: StageConfig,
220 era_import_source: Option<EraImportSource>,
221 ) -> Self {
222 Self { provider, tip, header_downloader, body_downloader, stages_config, era_import_source }
223 }
224}
225
226impl<P, H, B> OnlineStages<P, H, B>
227where
228 P: HeaderSyncGapProvider + 'static,
229 H: HeaderDownloader<Header = <B::Block as Block>::Header> + 'static,
230 B: BodyDownloader + 'static,
231{
232 pub fn builder_with_headers<Provider>(
234 headers: HeaderStage<P, H>,
235 body_downloader: B,
236 ) -> StageSetBuilder<Provider>
237 where
238 HeaderStage<P, H>: Stage<Provider>,
239 BodyStage<B>: Stage<Provider>,
240 {
241 StageSetBuilder::default().add_stage(headers).add_stage(BodyStage::new(body_downloader))
242 }
243
244 pub fn builder_with_bodies<Provider>(
246 bodies: BodyStage<B>,
247 provider: P,
248 tip: watch::Receiver<B256>,
249 header_downloader: H,
250 stages_config: StageConfig,
251 ) -> StageSetBuilder<Provider>
252 where
253 BodyStage<B>: Stage<Provider>,
254 HeaderStage<P, H>: Stage<Provider>,
255 {
256 StageSetBuilder::default()
257 .add_stage(HeaderStage::new(provider, header_downloader, tip, stages_config.etl))
258 .add_stage(bodies)
259 }
260}
261
262impl<Provider, P, H, B> StageSet<Provider> for OnlineStages<P, H, B>
263where
264 P: HeaderSyncGapProvider + 'static,
265 H: HeaderDownloader<Header = <B::Block as Block>::Header> + 'static,
266 B: BodyDownloader + 'static,
267 HeaderStage<P, H>: Stage<Provider>,
268 BodyStage<B>: Stage<Provider>,
269 EraStage<<B::Block as Block>::Header, <B::Block as Block>::Body, EraImportSource>:
270 Stage<Provider>,
271{
272 fn builder(self) -> StageSetBuilder<Provider> {
273 let mut builder = StageSetBuilder::default();
274
275 if self.era_import_source.is_some() {
276 builder = builder
277 .add_stage(EraStage::new(self.era_import_source, self.stages_config.etl.clone()));
278 }
279
280 builder
281 .add_stage(HeaderStage::new(
282 self.provider,
283 self.header_downloader,
284 self.tip,
285 self.stages_config.etl.clone(),
286 ))
287 .add_stage(BodyStage::new(self.body_downloader))
288 }
289}
290
291#[derive(Debug)]
301#[non_exhaustive]
302pub struct OfflineStages<E: ConfigureEvm> {
303 evm_config: E,
305 consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
307 stages_config: StageConfig,
309 prune_modes: PruneModes,
311}
312
313impl<E: ConfigureEvm> OfflineStages<E> {
314 pub const fn new(
316 evm_config: E,
317 consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
318 stages_config: StageConfig,
319 prune_modes: PruneModes,
320 ) -> Self {
321 Self { evm_config, consensus, stages_config, prune_modes }
322 }
323}
324
325impl<E, Provider> StageSet<Provider> for OfflineStages<E>
326where
327 E: ConfigureEvm,
328 ExecutionStages<E>: StageSet<Provider>,
329 PruneSenderRecoveryStage: Stage<Provider>,
330 HashingStages: StageSet<Provider>,
331 HistoryIndexingStages: StageSet<Provider>,
332 PruneStage: Stage<Provider>,
333{
334 fn builder(self) -> StageSetBuilder<Provider> {
335 ExecutionStages::new(self.evm_config, self.consensus, self.stages_config.clone())
336 .builder()
337 .add_stage_opt(self.prune_modes.sender_recovery.map(|prune_mode| {
339 PruneSenderRecoveryStage::new(prune_mode, self.stages_config.prune.commit_threshold)
340 }))
341 .add_set(HashingStages { stages_config: self.stages_config.clone() })
342 .add_set(HistoryIndexingStages {
343 stages_config: self.stages_config.clone(),
344 prune_modes: self.prune_modes.clone(),
345 })
346 .add_stage(PruneStage::new(
349 self.prune_modes.clone(),
350 self.stages_config.prune.commit_threshold,
351 ))
352 }
353}
354
355#[derive(Debug)]
357#[non_exhaustive]
358pub struct ExecutionStages<E: ConfigureEvm> {
359 evm_config: E,
361 consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
363 stages_config: StageConfig,
365}
366
367impl<E: ConfigureEvm> ExecutionStages<E> {
368 pub const fn new(
370 executor_provider: E,
371 consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
372 stages_config: StageConfig,
373 ) -> Self {
374 Self { evm_config: executor_provider, consensus, stages_config }
375 }
376}
377
378impl<E, Provider> StageSet<Provider> for ExecutionStages<E>
379where
380 E: ConfigureEvm + 'static,
381 SenderRecoveryStage: Stage<Provider>,
382 ExecutionStage<E>: Stage<Provider>,
383{
384 fn builder(self) -> StageSetBuilder<Provider> {
385 StageSetBuilder::default()
386 .add_stage(SenderRecoveryStage::new(self.stages_config.sender_recovery))
387 .add_stage(ExecutionStage::from_config(
388 self.evm_config,
389 self.consensus,
390 self.stages_config.execution,
391 self.stages_config.execution_external_clean_threshold(),
392 ))
393 }
394}
395
396#[derive(Debug, Default)]
405#[non_exhaustive]
406pub struct HashingStages {
407 stages_config: StageConfig,
409}
410
411impl<Provider> StageSet<Provider> for HashingStages
412where
413 MerkleStage: Stage<Provider>,
414 AccountHashingStage: Stage<Provider>,
415 StorageHashingStage: Stage<Provider>,
416 MerkleChangeSets: Stage<Provider>,
417{
418 fn builder(self) -> StageSetBuilder<Provider> {
419 StageSetBuilder::default()
420 .add_stage(MerkleStage::default_unwind())
421 .add_stage(AccountHashingStage::new(
422 self.stages_config.account_hashing,
423 self.stages_config.etl.clone(),
424 ))
425 .add_stage(StorageHashingStage::new(
426 self.stages_config.storage_hashing,
427 self.stages_config.etl.clone(),
428 ))
429 .add_stage(MerkleStage::new_execution(
430 self.stages_config.merkle.rebuild_threshold,
431 self.stages_config.merkle.incremental_threshold,
432 ))
433 .add_stage(MerkleChangeSets::new())
434 }
435}
436
437#[derive(Debug, Default)]
439#[non_exhaustive]
440pub struct HistoryIndexingStages {
441 stages_config: StageConfig,
443 prune_modes: PruneModes,
445}
446
447impl<Provider> StageSet<Provider> for HistoryIndexingStages
448where
449 TransactionLookupStage: Stage<Provider>,
450 IndexStorageHistoryStage: Stage<Provider>,
451 IndexAccountHistoryStage: Stage<Provider>,
452{
453 fn builder(self) -> StageSetBuilder<Provider> {
454 StageSetBuilder::default()
455 .add_stage(TransactionLookupStage::new(
456 self.stages_config.transaction_lookup,
457 self.stages_config.etl.clone(),
458 self.prune_modes.transaction_lookup,
459 ))
460 .add_stage(IndexStorageHistoryStage::new(
461 self.stages_config.index_storage_history,
462 self.stages_config.etl.clone(),
463 self.prune_modes.storage_history,
464 ))
465 .add_stage(IndexAccountHistoryStage::new(
466 self.stages_config.index_account_history,
467 self.stages_config.etl.clone(),
468 self.prune_modes.account_history,
469 ))
470 }
471}