1use crate::{
40 stages::{
41 AccountHashingStage, BodyStage, ExecutionStage, FinishStage, HeaderStage,
42 IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleStage, PruneSenderRecoveryStage,
43 PruneStage, SenderRecoveryStage, StorageHashingStage, TransactionLookupStage,
44 },
45 StageSet, StageSetBuilder,
46};
47use alloy_primitives::B256;
48use reth_config::config::StageConfig;
49use reth_consensus::{Consensus, ConsensusError, FullConsensus};
50use reth_evm::execute::BlockExecutorProvider;
51use reth_network_p2p::{bodies::downloader::BodyDownloader, headers::downloader::HeaderDownloader};
52use reth_primitives_traits::{Block, NodePrimitives};
53use reth_provider::HeaderSyncGapProvider;
54use reth_prune_types::PruneModes;
55use reth_stages_api::Stage;
56use std::{ops::Not, sync::Arc};
57use tokio::sync::watch;
58
59#[derive(Debug)]
83pub struct DefaultStages<Provider, H, B, E>
84where
85 H: HeaderDownloader,
86 B: BodyDownloader,
87 E: BlockExecutorProvider,
88{
89 online: OnlineStages<Provider, H, B>,
91 executor_provider: E,
93 consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
95 stages_config: StageConfig,
97 prune_modes: PruneModes,
99}
100
101impl<Provider, H, B, E> DefaultStages<Provider, H, B, E>
102where
103 H: HeaderDownloader,
104 B: BodyDownloader,
105 E: BlockExecutorProvider<Primitives: NodePrimitives<BlockHeader = H::Header, Block = B::Block>>,
106{
107 #[allow(clippy::too_many_arguments)]
109 pub fn new(
110 provider: Provider,
111 tip: watch::Receiver<B256>,
112 consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
113 header_downloader: H,
114 body_downloader: B,
115 executor_provider: E,
116 stages_config: StageConfig,
117 prune_modes: PruneModes,
118 ) -> Self {
119 Self {
120 online: OnlineStages::new(
121 provider,
122 tip,
123 consensus.clone().as_consensus(),
124 header_downloader,
125 body_downloader,
126 stages_config.clone(),
127 ),
128 executor_provider,
129 consensus,
130 stages_config,
131 prune_modes,
132 }
133 }
134}
135
136impl<P, H, B, E> DefaultStages<P, H, B, E>
137where
138 E: BlockExecutorProvider,
139 H: HeaderDownloader,
140 B: BodyDownloader,
141{
142 pub fn add_offline_stages<Provider>(
144 default_offline: StageSetBuilder<Provider>,
145 executor_provider: E,
146 consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
147 stages_config: StageConfig,
148 prune_modes: PruneModes,
149 ) -> StageSetBuilder<Provider>
150 where
151 OfflineStages<E>: StageSet<Provider>,
152 {
153 StageSetBuilder::default()
154 .add_set(default_offline)
155 .add_set(OfflineStages::new(executor_provider, consensus, stages_config, prune_modes))
156 .add_stage(FinishStage)
157 }
158}
159
160impl<P, H, B, E, Provider> StageSet<Provider> for DefaultStages<P, H, B, E>
161where
162 P: HeaderSyncGapProvider + 'static,
163 H: HeaderDownloader + 'static,
164 B: BodyDownloader + 'static,
165 E: BlockExecutorProvider,
166 OnlineStages<P, H, B>: StageSet<Provider>,
167 OfflineStages<E>: StageSet<Provider>,
168{
169 fn builder(self) -> StageSetBuilder<Provider> {
170 Self::add_offline_stages(
171 self.online.builder(),
172 self.executor_provider,
173 self.consensus,
174 self.stages_config.clone(),
175 self.prune_modes,
176 )
177 }
178}
179
180#[derive(Debug)]
185pub struct OnlineStages<Provider, H, B>
186where
187 H: HeaderDownloader,
188 B: BodyDownloader,
189{
190 provider: Provider,
192 tip: watch::Receiver<B256>,
194 consensus: Arc<dyn Consensus<B::Block, Error = ConsensusError>>,
196 header_downloader: H,
198 body_downloader: B,
200 stages_config: StageConfig,
202}
203
204impl<Provider, H, B> OnlineStages<Provider, H, B>
205where
206 H: HeaderDownloader,
207 B: BodyDownloader,
208{
209 pub fn new(
211 provider: Provider,
212 tip: watch::Receiver<B256>,
213 consensus: Arc<dyn Consensus<B::Block, Error = ConsensusError>>,
214 header_downloader: H,
215 body_downloader: B,
216 stages_config: StageConfig,
217 ) -> Self {
218 Self { provider, tip, consensus, header_downloader, body_downloader, stages_config }
219 }
220}
221
222impl<P, H, B> OnlineStages<P, H, B>
223where
224 P: HeaderSyncGapProvider + 'static,
225 H: HeaderDownloader<Header = <B::Block as Block>::Header> + 'static,
226 B: BodyDownloader + 'static,
227{
228 pub fn builder_with_headers<Provider>(
230 headers: HeaderStage<P, H>,
231 body_downloader: B,
232 ) -> StageSetBuilder<Provider>
233 where
234 HeaderStage<P, H>: Stage<Provider>,
235 BodyStage<B>: Stage<Provider>,
236 {
237 StageSetBuilder::default().add_stage(headers).add_stage(BodyStage::new(body_downloader))
238 }
239
240 pub fn builder_with_bodies<Provider>(
242 bodies: BodyStage<B>,
243 provider: P,
244 tip: watch::Receiver<B256>,
245 header_downloader: H,
246 consensus: Arc<dyn Consensus<B::Block, Error = ConsensusError>>,
247 stages_config: StageConfig,
248 ) -> StageSetBuilder<Provider>
249 where
250 BodyStage<B>: Stage<Provider>,
251 HeaderStage<P, H>: Stage<Provider>,
252 {
253 StageSetBuilder::default()
254 .add_stage(HeaderStage::new(
255 provider,
256 header_downloader,
257 tip,
258 consensus.clone().as_header_validator(),
259 stages_config.etl,
260 ))
261 .add_stage(bodies)
262 }
263}
264
265impl<Provider, P, H, B> StageSet<Provider> for OnlineStages<P, H, B>
266where
267 P: HeaderSyncGapProvider + 'static,
268 H: HeaderDownloader<Header = <B::Block as Block>::Header> + 'static,
269 B: BodyDownloader + 'static,
270 HeaderStage<P, H>: Stage<Provider>,
271 BodyStage<B>: Stage<Provider>,
272{
273 fn builder(self) -> StageSetBuilder<Provider> {
274 StageSetBuilder::default()
275 .add_stage(HeaderStage::new(
276 self.provider,
277 self.header_downloader,
278 self.tip,
279 self.consensus.clone().as_header_validator(),
280 self.stages_config.etl.clone(),
281 ))
282 .add_stage(BodyStage::new(self.body_downloader))
283 }
284}
285
286#[derive(Debug)]
296#[non_exhaustive]
297pub struct OfflineStages<E: BlockExecutorProvider> {
298 executor_provider: E,
300 consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
302 stages_config: StageConfig,
304 prune_modes: PruneModes,
306}
307
308impl<E: BlockExecutorProvider> OfflineStages<E> {
309 pub const fn new(
311 executor_provider: E,
312 consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
313 stages_config: StageConfig,
314 prune_modes: PruneModes,
315 ) -> Self {
316 Self { executor_provider, consensus, stages_config, prune_modes }
317 }
318}
319
320impl<E, Provider> StageSet<Provider> for OfflineStages<E>
321where
322 E: BlockExecutorProvider,
323 ExecutionStages<E>: StageSet<Provider>,
324 PruneSenderRecoveryStage: Stage<Provider>,
325 HashingStages: StageSet<Provider>,
326 HistoryIndexingStages: StageSet<Provider>,
327 PruneStage: Stage<Provider>,
328{
329 fn builder(self) -> StageSetBuilder<Provider> {
330 ExecutionStages::new(self.executor_provider, self.consensus, self.stages_config.clone())
331 .builder()
332 .add_stage_opt(self.prune_modes.sender_recovery.map(|prune_mode| {
334 PruneSenderRecoveryStage::new(prune_mode, self.stages_config.prune.commit_threshold)
335 }))
336 .add_set(HashingStages { stages_config: self.stages_config.clone() })
337 .add_set(HistoryIndexingStages {
338 stages_config: self.stages_config.clone(),
339 prune_modes: self.prune_modes.clone(),
340 })
341 .add_stage_opt(self.prune_modes.is_empty().not().then(|| {
343 PruneStage::new(self.prune_modes.clone(), self.stages_config.prune.commit_threshold)
346 }))
347 }
348}
349
350#[derive(Debug)]
352#[non_exhaustive]
353pub struct ExecutionStages<E: BlockExecutorProvider> {
354 executor_provider: E,
356 consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
358 stages_config: StageConfig,
360}
361
362impl<E: BlockExecutorProvider> ExecutionStages<E> {
363 pub const fn new(
365 executor_provider: E,
366 consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
367 stages_config: StageConfig,
368 ) -> Self {
369 Self { executor_provider, consensus, stages_config }
370 }
371}
372
373impl<E, Provider> StageSet<Provider> for ExecutionStages<E>
374where
375 E: BlockExecutorProvider,
376 SenderRecoveryStage: Stage<Provider>,
377 ExecutionStage<E>: Stage<Provider>,
378{
379 fn builder(self) -> StageSetBuilder<Provider> {
380 StageSetBuilder::default()
381 .add_stage(SenderRecoveryStage::new(self.stages_config.sender_recovery))
382 .add_stage(ExecutionStage::from_config(
383 self.executor_provider,
384 self.consensus,
385 self.stages_config.execution,
386 self.stages_config.execution_external_clean_threshold(),
387 ))
388 }
389}
390
391#[derive(Debug, Default)]
393#[non_exhaustive]
394pub struct HashingStages {
395 stages_config: StageConfig,
397}
398
399impl<Provider> StageSet<Provider> for HashingStages
400where
401 MerkleStage: Stage<Provider>,
402 AccountHashingStage: Stage<Provider>,
403 StorageHashingStage: Stage<Provider>,
404{
405 fn builder(self) -> StageSetBuilder<Provider> {
406 StageSetBuilder::default()
407 .add_stage(MerkleStage::default_unwind())
408 .add_stage(AccountHashingStage::new(
409 self.stages_config.account_hashing,
410 self.stages_config.etl.clone(),
411 ))
412 .add_stage(StorageHashingStage::new(
413 self.stages_config.storage_hashing,
414 self.stages_config.etl.clone(),
415 ))
416 .add_stage(MerkleStage::new_execution(self.stages_config.merkle.clean_threshold))
417 }
418}
419
420#[derive(Debug, Default)]
422#[non_exhaustive]
423pub struct HistoryIndexingStages {
424 stages_config: StageConfig,
426 prune_modes: PruneModes,
428}
429
430impl<Provider> StageSet<Provider> for HistoryIndexingStages
431where
432 TransactionLookupStage: Stage<Provider>,
433 IndexStorageHistoryStage: Stage<Provider>,
434 IndexAccountHistoryStage: Stage<Provider>,
435{
436 fn builder(self) -> StageSetBuilder<Provider> {
437 StageSetBuilder::default()
438 .add_stage(TransactionLookupStage::new(
439 self.stages_config.transaction_lookup,
440 self.stages_config.etl.clone(),
441 self.prune_modes.transaction_lookup,
442 ))
443 .add_stage(IndexStorageHistoryStage::new(
444 self.stages_config.index_storage_history,
445 self.stages_config.etl.clone(),
446 self.prune_modes.account_history,
447 ))
448 .add_stage(IndexAccountHistoryStage::new(
449 self.stages_config.index_account_history,
450 self.stages_config.etl.clone(),
451 self.prune_modes.storage_history,
452 ))
453 }
454}