1use crate::{
40 stages::{
41 AccountHashingStage, BodyStage, EraImportSource, EraStage, ExecutionStage, FinishStage,
42 HeaderStage, IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleStage,
43 PruneSenderRecoveryStage, PruneStage, SenderRecoveryStage, StorageHashingStage,
44 TransactionLookupStage,
45 },
46 StageSet, StageSetBuilder,
47};
48use alloy_primitives::B256;
49use reth_config::config::StageConfig;
50use reth_consensus::FullConsensus;
51use reth_evm::ConfigureEvm;
52use reth_network_p2p::{bodies::downloader::BodyDownloader, headers::downloader::HeaderDownloader};
53use reth_primitives_traits::{Block, NodePrimitives};
54use reth_provider::HeaderSyncGapProvider;
55use reth_prune_types::{PruneMode, PruneModes};
56use reth_stages_api::Stage;
57use std::sync::Arc;
58use tokio::sync::watch;
59
60#[derive(Debug)]
85pub struct DefaultStages<Provider, H, B, E>
86where
87 H: HeaderDownloader,
88 B: BodyDownloader,
89 E: ConfigureEvm,
90{
91 online: OnlineStages<Provider, H, B>,
93 evm_config: E,
95 consensus: Arc<dyn FullConsensus<E::Primitives>>,
97 stages_config: StageConfig,
99 prune_modes: PruneModes,
101}
102
103impl<Provider, H, B, E> DefaultStages<Provider, H, B, E>
104where
105 H: HeaderDownloader,
106 B: BodyDownloader,
107 E: ConfigureEvm<Primitives: NodePrimitives<BlockHeader = H::Header, Block = B::Block>>,
108{
109 #[expect(clippy::too_many_arguments)]
111 pub fn new(
112 provider: Provider,
113 tip: watch::Receiver<B256>,
114 consensus: Arc<dyn FullConsensus<E::Primitives>>,
115 header_downloader: H,
116 body_downloader: B,
117 evm_config: E,
118 stages_config: StageConfig,
119 prune_modes: PruneModes,
120 era_import_source: Option<EraImportSource>,
121 ) -> Self {
122 Self {
123 online: OnlineStages::new(
124 provider,
125 tip,
126 header_downloader,
127 body_downloader,
128 stages_config.clone(),
129 era_import_source,
130 ),
131 evm_config,
132 consensus,
133 stages_config,
134 prune_modes,
135 }
136 }
137}
138
139impl<P, H, B, E> DefaultStages<P, H, B, E>
140where
141 E: ConfigureEvm,
142 H: HeaderDownloader,
143 B: BodyDownloader,
144{
145 pub fn add_offline_stages<Provider>(
147 default_offline: StageSetBuilder<Provider>,
148 evm_config: E,
149 consensus: Arc<dyn FullConsensus<E::Primitives>>,
150 stages_config: StageConfig,
151 prune_modes: PruneModes,
152 ) -> StageSetBuilder<Provider>
153 where
154 OfflineStages<E>: StageSet<Provider>,
155 {
156 StageSetBuilder::default()
157 .add_set(default_offline)
158 .add_set(OfflineStages::new(evm_config, consensus, stages_config, prune_modes))
159 .add_stage(FinishStage)
160 }
161}
162
163impl<P, H, B, E, Provider> StageSet<Provider> for DefaultStages<P, H, B, E>
164where
165 P: HeaderSyncGapProvider + 'static,
166 H: HeaderDownloader + 'static,
167 B: BodyDownloader + 'static,
168 E: ConfigureEvm,
169 OnlineStages<P, H, B>: StageSet<Provider>,
170 OfflineStages<E>: StageSet<Provider>,
171{
172 fn builder(self) -> StageSetBuilder<Provider> {
173 Self::add_offline_stages(
174 self.online.builder(),
175 self.evm_config,
176 self.consensus,
177 self.stages_config.clone(),
178 self.prune_modes,
179 )
180 }
181}
182
183#[derive(Debug)]
188pub struct OnlineStages<Provider, H, B>
189where
190 H: HeaderDownloader,
191 B: BodyDownloader,
192{
193 provider: Provider,
195 tip: watch::Receiver<B256>,
197
198 header_downloader: H,
200 body_downloader: B,
202 stages_config: StageConfig,
204 era_import_source: Option<EraImportSource>,
206}
207
208impl<Provider, H, B> OnlineStages<Provider, H, B>
209where
210 H: HeaderDownloader,
211 B: BodyDownloader,
212{
213 pub const fn new(
215 provider: Provider,
216 tip: watch::Receiver<B256>,
217 header_downloader: H,
218 body_downloader: B,
219 stages_config: StageConfig,
220 era_import_source: Option<EraImportSource>,
221 ) -> Self {
222 Self { provider, tip, header_downloader, body_downloader, stages_config, era_import_source }
223 }
224}
225
226impl<P, H, B> OnlineStages<P, H, B>
227where
228 P: HeaderSyncGapProvider + 'static,
229 H: HeaderDownloader<Header = <B::Block as Block>::Header> + 'static,
230 B: BodyDownloader + 'static,
231{
232 pub fn builder_with_headers<Provider>(
234 headers: HeaderStage<P, H>,
235 body_downloader: B,
236 ) -> StageSetBuilder<Provider>
237 where
238 HeaderStage<P, H>: Stage<Provider>,
239 BodyStage<B>: Stage<Provider>,
240 {
241 StageSetBuilder::default().add_stage(headers).add_stage(BodyStage::new(body_downloader))
242 }
243
244 pub fn builder_with_bodies<Provider>(
246 bodies: BodyStage<B>,
247 provider: P,
248 tip: watch::Receiver<B256>,
249 header_downloader: H,
250 stages_config: StageConfig,
251 ) -> StageSetBuilder<Provider>
252 where
253 BodyStage<B>: Stage<Provider>,
254 HeaderStage<P, H>: Stage<Provider>,
255 {
256 StageSetBuilder::default()
257 .add_stage(HeaderStage::new(provider, header_downloader, tip, stages_config.etl))
258 .add_stage(bodies)
259 }
260}
261
262impl<Provider, P, H, B> StageSet<Provider> for OnlineStages<P, H, B>
263where
264 P: HeaderSyncGapProvider + 'static,
265 H: HeaderDownloader<Header = <B::Block as Block>::Header> + 'static,
266 B: BodyDownloader + 'static,
267 HeaderStage<P, H>: Stage<Provider>,
268 BodyStage<B>: Stage<Provider>,
269 EraStage<<B::Block as Block>::Header, <B::Block as Block>::Body, EraImportSource>:
270 Stage<Provider>,
271{
272 fn builder(self) -> StageSetBuilder<Provider> {
273 let mut builder = StageSetBuilder::default();
274
275 if self.era_import_source.is_some() {
276 builder = builder
277 .add_stage(EraStage::new(self.era_import_source, self.stages_config.etl.clone()));
278 }
279
280 builder
281 .add_stage(HeaderStage::new(
282 self.provider,
283 self.header_downloader,
284 self.tip,
285 self.stages_config.etl.clone(),
286 ))
287 .add_stage(BodyStage::new(self.body_downloader))
288 }
289}
290
291#[derive(Debug)]
301#[non_exhaustive]
302pub struct OfflineStages<E: ConfigureEvm> {
303 evm_config: E,
305 consensus: Arc<dyn FullConsensus<E::Primitives>>,
307 stages_config: StageConfig,
309 prune_modes: PruneModes,
311}
312
313impl<E: ConfigureEvm> OfflineStages<E> {
314 pub const fn new(
316 evm_config: E,
317 consensus: Arc<dyn FullConsensus<E::Primitives>>,
318 stages_config: StageConfig,
319 prune_modes: PruneModes,
320 ) -> Self {
321 Self { evm_config, consensus, stages_config, prune_modes }
322 }
323}
324
325impl<E, Provider> StageSet<Provider> for OfflineStages<E>
326where
327 E: ConfigureEvm,
328 ExecutionStages<E>: StageSet<Provider>,
329 PruneSenderRecoveryStage: Stage<Provider>,
330 HashingStages: StageSet<Provider>,
331 HistoryIndexingStages: StageSet<Provider>,
332 PruneStage: Stage<Provider>,
333{
334 fn builder(self) -> StageSetBuilder<Provider> {
335 ExecutionStages::new(
336 self.evm_config,
337 self.consensus,
338 self.stages_config.clone(),
339 self.prune_modes.sender_recovery,
340 )
341 .builder()
342 .add_stage_opt(self.prune_modes.sender_recovery.map(|prune_mode| {
344 PruneSenderRecoveryStage::new(prune_mode, self.stages_config.prune.commit_threshold)
345 }))
346 .add_set(HashingStages { stages_config: self.stages_config.clone() })
347 .add_set(HistoryIndexingStages {
348 stages_config: self.stages_config.clone(),
349 prune_modes: self.prune_modes.clone(),
350 })
351 .add_stage(PruneStage::new(
354 self.prune_modes.clone(),
355 self.stages_config.prune.commit_threshold,
356 ))
357 }
358}
359
360#[derive(Debug)]
362#[non_exhaustive]
363pub struct ExecutionStages<E: ConfigureEvm> {
364 evm_config: E,
366 consensus: Arc<dyn FullConsensus<E::Primitives>>,
368 stages_config: StageConfig,
370 sender_recovery_prune_mode: Option<PruneMode>,
372}
373
374impl<E: ConfigureEvm> ExecutionStages<E> {
375 pub const fn new(
377 executor_provider: E,
378 consensus: Arc<dyn FullConsensus<E::Primitives>>,
379 stages_config: StageConfig,
380 sender_recovery_prune_mode: Option<PruneMode>,
381 ) -> Self {
382 Self { evm_config: executor_provider, consensus, stages_config, sender_recovery_prune_mode }
383 }
384}
385
386impl<E, Provider> StageSet<Provider> for ExecutionStages<E>
387where
388 E: ConfigureEvm + 'static,
389 SenderRecoveryStage: Stage<Provider>,
390 ExecutionStage<E>: Stage<Provider>,
391{
392 fn builder(self) -> StageSetBuilder<Provider> {
393 StageSetBuilder::default()
394 .add_stage(SenderRecoveryStage::new(
395 self.stages_config.sender_recovery,
396 self.sender_recovery_prune_mode,
397 ))
398 .add_stage(ExecutionStage::from_config(
399 self.evm_config,
400 self.consensus,
401 self.stages_config.execution,
402 self.stages_config.execution_external_clean_threshold(),
403 ))
404 }
405}
406
407#[derive(Debug, Default)]
415#[non_exhaustive]
416pub struct HashingStages {
417 stages_config: StageConfig,
419}
420
421impl<Provider> StageSet<Provider> for HashingStages
422where
423 MerkleStage: Stage<Provider>,
424 AccountHashingStage: Stage<Provider>,
425 StorageHashingStage: Stage<Provider>,
426{
427 fn builder(self) -> StageSetBuilder<Provider> {
428 StageSetBuilder::default()
429 .add_stage(MerkleStage::default_unwind())
430 .add_stage(AccountHashingStage::new(
431 self.stages_config.account_hashing,
432 self.stages_config.etl.clone(),
433 ))
434 .add_stage(StorageHashingStage::new(
435 self.stages_config.storage_hashing,
436 self.stages_config.etl.clone(),
437 ))
438 .add_stage(MerkleStage::new_execution(
439 self.stages_config.merkle.rebuild_threshold,
440 self.stages_config.merkle.incremental_threshold,
441 ))
442 }
443}
444
445#[derive(Debug, Default)]
447#[non_exhaustive]
448pub struct HistoryIndexingStages {
449 stages_config: StageConfig,
451 prune_modes: PruneModes,
453}
454
455impl<Provider> StageSet<Provider> for HistoryIndexingStages
456where
457 TransactionLookupStage: Stage<Provider>,
458 IndexStorageHistoryStage: Stage<Provider>,
459 IndexAccountHistoryStage: Stage<Provider>,
460{
461 fn builder(self) -> StageSetBuilder<Provider> {
462 StageSetBuilder::default()
463 .add_stage(TransactionLookupStage::new(
464 self.stages_config.transaction_lookup,
465 self.stages_config.etl.clone(),
466 self.prune_modes.transaction_lookup,
467 ))
468 .add_stage(IndexStorageHistoryStage::new(
469 self.stages_config.index_storage_history,
470 self.stages_config.etl.clone(),
471 self.prune_modes.storage_history,
472 ))
473 .add_stage(IndexAccountHistoryStage::new(
474 self.stages_config.index_account_history,
475 self.stages_config.etl.clone(),
476 self.prune_modes.account_history,
477 ))
478 }
479}