1use crate::{
40 stages::{
41 AccountHashingStage, BodyStage, EraImportSource, EraStage, ExecutionStage, FinishStage,
42 HeaderStage, IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleChangeSets,
43 MerkleStage, PruneSenderRecoveryStage, PruneStage, SenderRecoveryStage,
44 StorageHashingStage, TransactionLookupStage,
45 },
46 StageSet, StageSetBuilder,
47};
48use alloy_primitives::B256;
49use reth_config::config::StageConfig;
50use reth_consensus::FullConsensus;
51use reth_evm::ConfigureEvm;
52use reth_network_p2p::{bodies::downloader::BodyDownloader, headers::downloader::HeaderDownloader};
53use reth_primitives_traits::{Block, NodePrimitives};
54use reth_provider::HeaderSyncGapProvider;
55use reth_prune_types::PruneModes;
56use reth_stages_api::Stage;
57use std::sync::Arc;
58use tokio::sync::watch;
59
60#[derive(Debug)]
86pub struct DefaultStages<Provider, H, B, E>
87where
88 H: HeaderDownloader,
89 B: BodyDownloader,
90 E: ConfigureEvm,
91{
92 online: OnlineStages<Provider, H, B>,
94 evm_config: E,
96 consensus: Arc<dyn FullConsensus<E::Primitives>>,
98 stages_config: StageConfig,
100 prune_modes: PruneModes,
102}
103
104impl<Provider, H, B, E> DefaultStages<Provider, H, B, E>
105where
106 H: HeaderDownloader,
107 B: BodyDownloader,
108 E: ConfigureEvm<Primitives: NodePrimitives<BlockHeader = H::Header, Block = B::Block>>,
109{
110 #[expect(clippy::too_many_arguments)]
112 pub fn new(
113 provider: Provider,
114 tip: watch::Receiver<B256>,
115 consensus: Arc<dyn FullConsensus<E::Primitives>>,
116 header_downloader: H,
117 body_downloader: B,
118 evm_config: E,
119 stages_config: StageConfig,
120 prune_modes: PruneModes,
121 era_import_source: Option<EraImportSource>,
122 ) -> Self {
123 Self {
124 online: OnlineStages::new(
125 provider,
126 tip,
127 header_downloader,
128 body_downloader,
129 stages_config.clone(),
130 era_import_source,
131 ),
132 evm_config,
133 consensus,
134 stages_config,
135 prune_modes,
136 }
137 }
138}
139
140impl<P, H, B, E> DefaultStages<P, H, B, E>
141where
142 E: ConfigureEvm,
143 H: HeaderDownloader,
144 B: BodyDownloader,
145{
146 pub fn add_offline_stages<Provider>(
148 default_offline: StageSetBuilder<Provider>,
149 evm_config: E,
150 consensus: Arc<dyn FullConsensus<E::Primitives>>,
151 stages_config: StageConfig,
152 prune_modes: PruneModes,
153 ) -> StageSetBuilder<Provider>
154 where
155 OfflineStages<E>: StageSet<Provider>,
156 {
157 StageSetBuilder::default()
158 .add_set(default_offline)
159 .add_set(OfflineStages::new(evm_config, consensus, stages_config, prune_modes))
160 .add_stage(FinishStage)
161 }
162}
163
164impl<P, H, B, E, Provider> StageSet<Provider> for DefaultStages<P, H, B, E>
165where
166 P: HeaderSyncGapProvider + 'static,
167 H: HeaderDownloader + 'static,
168 B: BodyDownloader + 'static,
169 E: ConfigureEvm,
170 OnlineStages<P, H, B>: StageSet<Provider>,
171 OfflineStages<E>: StageSet<Provider>,
172{
173 fn builder(self) -> StageSetBuilder<Provider> {
174 Self::add_offline_stages(
175 self.online.builder(),
176 self.evm_config,
177 self.consensus,
178 self.stages_config.clone(),
179 self.prune_modes,
180 )
181 }
182}
183
184#[derive(Debug)]
189pub struct OnlineStages<Provider, H, B>
190where
191 H: HeaderDownloader,
192 B: BodyDownloader,
193{
194 provider: Provider,
196 tip: watch::Receiver<B256>,
198
199 header_downloader: H,
201 body_downloader: B,
203 stages_config: StageConfig,
205 era_import_source: Option<EraImportSource>,
207}
208
209impl<Provider, H, B> OnlineStages<Provider, H, B>
210where
211 H: HeaderDownloader,
212 B: BodyDownloader,
213{
214 pub const fn new(
216 provider: Provider,
217 tip: watch::Receiver<B256>,
218 header_downloader: H,
219 body_downloader: B,
220 stages_config: StageConfig,
221 era_import_source: Option<EraImportSource>,
222 ) -> Self {
223 Self { provider, tip, header_downloader, body_downloader, stages_config, era_import_source }
224 }
225}
226
227impl<P, H, B> OnlineStages<P, H, B>
228where
229 P: HeaderSyncGapProvider + 'static,
230 H: HeaderDownloader<Header = <B::Block as Block>::Header> + 'static,
231 B: BodyDownloader + 'static,
232{
233 pub fn builder_with_headers<Provider>(
235 headers: HeaderStage<P, H>,
236 body_downloader: B,
237 ) -> StageSetBuilder<Provider>
238 where
239 HeaderStage<P, H>: Stage<Provider>,
240 BodyStage<B>: Stage<Provider>,
241 {
242 StageSetBuilder::default().add_stage(headers).add_stage(BodyStage::new(body_downloader))
243 }
244
245 pub fn builder_with_bodies<Provider>(
247 bodies: BodyStage<B>,
248 provider: P,
249 tip: watch::Receiver<B256>,
250 header_downloader: H,
251 stages_config: StageConfig,
252 ) -> StageSetBuilder<Provider>
253 where
254 BodyStage<B>: Stage<Provider>,
255 HeaderStage<P, H>: Stage<Provider>,
256 {
257 StageSetBuilder::default()
258 .add_stage(HeaderStage::new(provider, header_downloader, tip, stages_config.etl))
259 .add_stage(bodies)
260 }
261}
262
263impl<Provider, P, H, B> StageSet<Provider> for OnlineStages<P, H, B>
264where
265 P: HeaderSyncGapProvider + 'static,
266 H: HeaderDownloader<Header = <B::Block as Block>::Header> + 'static,
267 B: BodyDownloader + 'static,
268 HeaderStage<P, H>: Stage<Provider>,
269 BodyStage<B>: Stage<Provider>,
270 EraStage<<B::Block as Block>::Header, <B::Block as Block>::Body, EraImportSource>:
271 Stage<Provider>,
272{
273 fn builder(self) -> StageSetBuilder<Provider> {
274 let mut builder = StageSetBuilder::default();
275
276 if self.era_import_source.is_some() {
277 builder = builder
278 .add_stage(EraStage::new(self.era_import_source, self.stages_config.etl.clone()));
279 }
280
281 builder
282 .add_stage(HeaderStage::new(
283 self.provider,
284 self.header_downloader,
285 self.tip,
286 self.stages_config.etl.clone(),
287 ))
288 .add_stage(BodyStage::new(self.body_downloader))
289 }
290}
291
292#[derive(Debug)]
302#[non_exhaustive]
303pub struct OfflineStages<E: ConfigureEvm> {
304 evm_config: E,
306 consensus: Arc<dyn FullConsensus<E::Primitives>>,
308 stages_config: StageConfig,
310 prune_modes: PruneModes,
312}
313
314impl<E: ConfigureEvm> OfflineStages<E> {
315 pub const fn new(
317 evm_config: E,
318 consensus: Arc<dyn FullConsensus<E::Primitives>>,
319 stages_config: StageConfig,
320 prune_modes: PruneModes,
321 ) -> Self {
322 Self { evm_config, consensus, stages_config, prune_modes }
323 }
324}
325
326impl<E, Provider> StageSet<Provider> for OfflineStages<E>
327where
328 E: ConfigureEvm,
329 ExecutionStages<E>: StageSet<Provider>,
330 PruneSenderRecoveryStage: Stage<Provider>,
331 HashingStages: StageSet<Provider>,
332 HistoryIndexingStages: StageSet<Provider>,
333 PruneStage: Stage<Provider>,
334{
335 fn builder(self) -> StageSetBuilder<Provider> {
336 ExecutionStages::new(self.evm_config, self.consensus, self.stages_config.clone())
337 .builder()
338 .add_stage_opt(self.prune_modes.sender_recovery.map(|prune_mode| {
340 PruneSenderRecoveryStage::new(prune_mode, self.stages_config.prune.commit_threshold)
341 }))
342 .add_set(HashingStages { stages_config: self.stages_config.clone() })
343 .add_set(HistoryIndexingStages {
344 stages_config: self.stages_config.clone(),
345 prune_modes: self.prune_modes.clone(),
346 })
347 .add_stage(PruneStage::new(
350 self.prune_modes.clone(),
351 self.stages_config.prune.commit_threshold,
352 ))
353 }
354}
355
356#[derive(Debug)]
358#[non_exhaustive]
359pub struct ExecutionStages<E: ConfigureEvm> {
360 evm_config: E,
362 consensus: Arc<dyn FullConsensus<E::Primitives>>,
364 stages_config: StageConfig,
366}
367
368impl<E: ConfigureEvm> ExecutionStages<E> {
369 pub const fn new(
371 executor_provider: E,
372 consensus: Arc<dyn FullConsensus<E::Primitives>>,
373 stages_config: StageConfig,
374 ) -> Self {
375 Self { evm_config: executor_provider, consensus, stages_config }
376 }
377}
378
379impl<E, Provider> StageSet<Provider> for ExecutionStages<E>
380where
381 E: ConfigureEvm + 'static,
382 SenderRecoveryStage: Stage<Provider>,
383 ExecutionStage<E>: Stage<Provider>,
384{
385 fn builder(self) -> StageSetBuilder<Provider> {
386 StageSetBuilder::default()
387 .add_stage(SenderRecoveryStage::new(self.stages_config.sender_recovery))
388 .add_stage(ExecutionStage::from_config(
389 self.evm_config,
390 self.consensus,
391 self.stages_config.execution,
392 self.stages_config.execution_external_clean_threshold(),
393 ))
394 }
395}
396
397#[derive(Debug, Default)]
406#[non_exhaustive]
407pub struct HashingStages {
408 stages_config: StageConfig,
410}
411
412impl<Provider> StageSet<Provider> for HashingStages
413where
414 MerkleStage: Stage<Provider>,
415 AccountHashingStage: Stage<Provider>,
416 StorageHashingStage: Stage<Provider>,
417 MerkleChangeSets: Stage<Provider>,
418{
419 fn builder(self) -> StageSetBuilder<Provider> {
420 StageSetBuilder::default()
421 .add_stage(MerkleStage::default_unwind())
422 .add_stage(AccountHashingStage::new(
423 self.stages_config.account_hashing,
424 self.stages_config.etl.clone(),
425 ))
426 .add_stage(StorageHashingStage::new(
427 self.stages_config.storage_hashing,
428 self.stages_config.etl.clone(),
429 ))
430 .add_stage(MerkleStage::new_execution(
431 self.stages_config.merkle.rebuild_threshold,
432 self.stages_config.merkle.incremental_threshold,
433 ))
434 .add_stage(MerkleChangeSets::new())
435 }
436}
437
438#[derive(Debug, Default)]
440#[non_exhaustive]
441pub struct HistoryIndexingStages {
442 stages_config: StageConfig,
444 prune_modes: PruneModes,
446}
447
448impl<Provider> StageSet<Provider> for HistoryIndexingStages
449where
450 TransactionLookupStage: Stage<Provider>,
451 IndexStorageHistoryStage: Stage<Provider>,
452 IndexAccountHistoryStage: Stage<Provider>,
453{
454 fn builder(self) -> StageSetBuilder<Provider> {
455 StageSetBuilder::default()
456 .add_stage(TransactionLookupStage::new(
457 self.stages_config.transaction_lookup,
458 self.stages_config.etl.clone(),
459 self.prune_modes.transaction_lookup,
460 ))
461 .add_stage(IndexStorageHistoryStage::new(
462 self.stages_config.index_storage_history,
463 self.stages_config.etl.clone(),
464 self.prune_modes.storage_history,
465 ))
466 .add_stage(IndexAccountHistoryStage::new(
467 self.stages_config.index_account_history,
468 self.stages_config.etl.clone(),
469 self.prune_modes.account_history,
470 ))
471 }
472}