1use crate::{
40 stages::{
41 AccountHashingStage, BodyStage, EraImportSource, EraStage, ExecutionStage, FinishStage,
42 HeaderStage, IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleStage,
43 PruneSenderRecoveryStage, PruneStage, SenderRecoveryStage, StorageHashingStage,
44 TransactionLookupStage,
45 },
46 StageSet, StageSetBuilder,
47};
48use alloy_primitives::B256;
49use reth_config::config::StageConfig;
50use reth_consensus::{ConsensusError, FullConsensus};
51use reth_evm::ConfigureEvm;
52use reth_network_p2p::{bodies::downloader::BodyDownloader, headers::downloader::HeaderDownloader};
53use reth_primitives_traits::{Block, NodePrimitives};
54use reth_provider::HeaderSyncGapProvider;
55use reth_prune_types::PruneModes;
56use reth_stages_api::Stage;
57use std::{ops::Not, sync::Arc};
58use tokio::sync::watch;
59
60#[derive(Debug)]
84pub struct DefaultStages<Provider, H, B, E>
85where
86 H: HeaderDownloader,
87 B: BodyDownloader,
88 E: ConfigureEvm,
89{
90 online: OnlineStages<Provider, H, B>,
92 evm_config: E,
94 consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
96 stages_config: StageConfig,
98 prune_modes: PruneModes,
100}
101
102impl<Provider, H, B, E> DefaultStages<Provider, H, B, E>
103where
104 H: HeaderDownloader,
105 B: BodyDownloader,
106 E: ConfigureEvm<Primitives: NodePrimitives<BlockHeader = H::Header, Block = B::Block>>,
107{
108 #[expect(clippy::too_many_arguments)]
110 pub fn new(
111 provider: Provider,
112 tip: watch::Receiver<B256>,
113 consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
114 header_downloader: H,
115 body_downloader: B,
116 evm_config: E,
117 stages_config: StageConfig,
118 prune_modes: PruneModes,
119 era_import_source: Option<EraImportSource>,
120 ) -> Self {
121 Self {
122 online: OnlineStages::new(
123 provider,
124 tip,
125 header_downloader,
126 body_downloader,
127 stages_config.clone(),
128 era_import_source,
129 ),
130 evm_config,
131 consensus,
132 stages_config,
133 prune_modes,
134 }
135 }
136}
137
138impl<P, H, B, E> DefaultStages<P, H, B, E>
139where
140 E: ConfigureEvm,
141 H: HeaderDownloader,
142 B: BodyDownloader,
143{
144 pub fn add_offline_stages<Provider>(
146 default_offline: StageSetBuilder<Provider>,
147 evm_config: E,
148 consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
149 stages_config: StageConfig,
150 prune_modes: PruneModes,
151 ) -> StageSetBuilder<Provider>
152 where
153 OfflineStages<E>: StageSet<Provider>,
154 {
155 StageSetBuilder::default()
156 .add_set(default_offline)
157 .add_set(OfflineStages::new(evm_config, consensus, stages_config, prune_modes))
158 .add_stage(FinishStage)
159 }
160}
161
162impl<P, H, B, E, Provider> StageSet<Provider> for DefaultStages<P, H, B, E>
163where
164 P: HeaderSyncGapProvider + 'static,
165 H: HeaderDownloader + 'static,
166 B: BodyDownloader + 'static,
167 E: ConfigureEvm,
168 OnlineStages<P, H, B>: StageSet<Provider>,
169 OfflineStages<E>: StageSet<Provider>,
170{
171 fn builder(self) -> StageSetBuilder<Provider> {
172 Self::add_offline_stages(
173 self.online.builder(),
174 self.evm_config,
175 self.consensus,
176 self.stages_config.clone(),
177 self.prune_modes,
178 )
179 }
180}
181
182#[derive(Debug)]
187pub struct OnlineStages<Provider, H, B>
188where
189 H: HeaderDownloader,
190 B: BodyDownloader,
191{
192 provider: Provider,
194 tip: watch::Receiver<B256>,
196
197 header_downloader: H,
199 body_downloader: B,
201 stages_config: StageConfig,
203 era_import_source: Option<EraImportSource>,
205}
206
207impl<Provider, H, B> OnlineStages<Provider, H, B>
208where
209 H: HeaderDownloader,
210 B: BodyDownloader,
211{
212 pub const fn new(
214 provider: Provider,
215 tip: watch::Receiver<B256>,
216 header_downloader: H,
217 body_downloader: B,
218 stages_config: StageConfig,
219 era_import_source: Option<EraImportSource>,
220 ) -> Self {
221 Self { provider, tip, header_downloader, body_downloader, stages_config, era_import_source }
222 }
223}
224
225impl<P, H, B> OnlineStages<P, H, B>
226where
227 P: HeaderSyncGapProvider + 'static,
228 H: HeaderDownloader<Header = <B::Block as Block>::Header> + 'static,
229 B: BodyDownloader + 'static,
230{
231 pub fn builder_with_headers<Provider>(
233 headers: HeaderStage<P, H>,
234 body_downloader: B,
235 ) -> StageSetBuilder<Provider>
236 where
237 HeaderStage<P, H>: Stage<Provider>,
238 BodyStage<B>: Stage<Provider>,
239 {
240 StageSetBuilder::default().add_stage(headers).add_stage(BodyStage::new(body_downloader))
241 }
242
243 pub fn builder_with_bodies<Provider>(
245 bodies: BodyStage<B>,
246 provider: P,
247 tip: watch::Receiver<B256>,
248 header_downloader: H,
249 stages_config: StageConfig,
250 ) -> StageSetBuilder<Provider>
251 where
252 BodyStage<B>: Stage<Provider>,
253 HeaderStage<P, H>: Stage<Provider>,
254 {
255 StageSetBuilder::default()
256 .add_stage(HeaderStage::new(provider, header_downloader, tip, stages_config.etl))
257 .add_stage(bodies)
258 }
259}
260
261impl<Provider, P, H, B> StageSet<Provider> for OnlineStages<P, H, B>
262where
263 P: HeaderSyncGapProvider + 'static,
264 H: HeaderDownloader<Header = <B::Block as Block>::Header> + 'static,
265 B: BodyDownloader + 'static,
266 HeaderStage<P, H>: Stage<Provider>,
267 BodyStage<B>: Stage<Provider>,
268 EraStage<<B::Block as Block>::Header, <B::Block as Block>::Body, EraImportSource>:
269 Stage<Provider>,
270{
271 fn builder(self) -> StageSetBuilder<Provider> {
272 StageSetBuilder::default()
273 .add_stage(EraStage::new(self.era_import_source, self.stages_config.etl.clone()))
274 .add_stage(HeaderStage::new(
275 self.provider,
276 self.header_downloader,
277 self.tip,
278 self.stages_config.etl.clone(),
279 ))
280 .add_stage(BodyStage::new(self.body_downloader))
281 }
282}
283
284#[derive(Debug)]
294#[non_exhaustive]
295pub struct OfflineStages<E: ConfigureEvm> {
296 evm_config: E,
298 consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
300 stages_config: StageConfig,
302 prune_modes: PruneModes,
304}
305
306impl<E: ConfigureEvm> OfflineStages<E> {
307 pub const fn new(
309 evm_config: E,
310 consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
311 stages_config: StageConfig,
312 prune_modes: PruneModes,
313 ) -> Self {
314 Self { evm_config, consensus, stages_config, prune_modes }
315 }
316}
317
318impl<E, Provider> StageSet<Provider> for OfflineStages<E>
319where
320 E: ConfigureEvm,
321 ExecutionStages<E>: StageSet<Provider>,
322 PruneSenderRecoveryStage: Stage<Provider>,
323 HashingStages: StageSet<Provider>,
324 HistoryIndexingStages: StageSet<Provider>,
325 PruneStage: Stage<Provider>,
326{
327 fn builder(self) -> StageSetBuilder<Provider> {
328 ExecutionStages::new(self.evm_config, self.consensus, self.stages_config.clone())
329 .builder()
330 .add_stage_opt(self.prune_modes.sender_recovery.map(|prune_mode| {
332 PruneSenderRecoveryStage::new(prune_mode, self.stages_config.prune.commit_threshold)
333 }))
334 .add_set(HashingStages { stages_config: self.stages_config.clone() })
335 .add_set(HistoryIndexingStages {
336 stages_config: self.stages_config.clone(),
337 prune_modes: self.prune_modes.clone(),
338 })
339 .add_stage_opt(self.prune_modes.is_empty().not().then(|| {
341 PruneStage::new(self.prune_modes.clone(), self.stages_config.prune.commit_threshold)
344 }))
345 }
346}
347
348#[derive(Debug)]
350#[non_exhaustive]
351pub struct ExecutionStages<E: ConfigureEvm> {
352 evm_config: E,
354 consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
356 stages_config: StageConfig,
358}
359
360impl<E: ConfigureEvm> ExecutionStages<E> {
361 pub const fn new(
363 executor_provider: E,
364 consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
365 stages_config: StageConfig,
366 ) -> Self {
367 Self { evm_config: executor_provider, consensus, stages_config }
368 }
369}
370
371impl<E, Provider> StageSet<Provider> for ExecutionStages<E>
372where
373 E: ConfigureEvm + 'static,
374 SenderRecoveryStage: Stage<Provider>,
375 ExecutionStage<E>: Stage<Provider>,
376{
377 fn builder(self) -> StageSetBuilder<Provider> {
378 StageSetBuilder::default()
379 .add_stage(SenderRecoveryStage::new(self.stages_config.sender_recovery))
380 .add_stage(ExecutionStage::from_config(
381 self.evm_config,
382 self.consensus,
383 self.stages_config.execution,
384 self.stages_config.execution_external_clean_threshold(),
385 ))
386 }
387}
388
389#[derive(Debug, Default)]
391#[non_exhaustive]
392pub struct HashingStages {
393 stages_config: StageConfig,
395}
396
397impl<Provider> StageSet<Provider> for HashingStages
398where
399 MerkleStage: Stage<Provider>,
400 AccountHashingStage: Stage<Provider>,
401 StorageHashingStage: Stage<Provider>,
402{
403 fn builder(self) -> StageSetBuilder<Provider> {
404 StageSetBuilder::default()
405 .add_stage(MerkleStage::default_unwind())
406 .add_stage(AccountHashingStage::new(
407 self.stages_config.account_hashing,
408 self.stages_config.etl.clone(),
409 ))
410 .add_stage(StorageHashingStage::new(
411 self.stages_config.storage_hashing,
412 self.stages_config.etl.clone(),
413 ))
414 .add_stage(MerkleStage::new_execution(
415 self.stages_config.merkle.rebuild_threshold,
416 self.stages_config.merkle.incremental_threshold,
417 ))
418 }
419}
420
421#[derive(Debug, Default)]
423#[non_exhaustive]
424pub struct HistoryIndexingStages {
425 stages_config: StageConfig,
427 prune_modes: PruneModes,
429}
430
431impl<Provider> StageSet<Provider> for HistoryIndexingStages
432where
433 TransactionLookupStage: Stage<Provider>,
434 IndexStorageHistoryStage: Stage<Provider>,
435 IndexAccountHistoryStage: Stage<Provider>,
436{
437 fn builder(self) -> StageSetBuilder<Provider> {
438 StageSetBuilder::default()
439 .add_stage(TransactionLookupStage::new(
440 self.stages_config.transaction_lookup,
441 self.stages_config.etl.clone(),
442 self.prune_modes.transaction_lookup,
443 ))
444 .add_stage(IndexStorageHistoryStage::new(
445 self.stages_config.index_storage_history,
446 self.stages_config.etl.clone(),
447 self.prune_modes.storage_history,
448 ))
449 .add_stage(IndexAccountHistoryStage::new(
450 self.stages_config.index_account_history,
451 self.stages_config.etl.clone(),
452 self.prune_modes.account_history,
453 ))
454 }
455}