reth_exex/backfill/
factory.rs
1use crate::BackfillJob;
2use std::{ops::RangeInclusive, time::Duration};
3
4use alloy_primitives::BlockNumber;
5use reth_node_api::FullNodeComponents;
6use reth_prune_types::PruneModes;
7use reth_stages_api::ExecutionStageThresholds;
8
9use super::stream::DEFAULT_PARALLELISM;
10
11#[derive(Debug, Clone)]
13pub struct BackfillJobFactory<E, P> {
14 executor: E,
15 provider: P,
16 prune_modes: PruneModes,
17 thresholds: ExecutionStageThresholds,
18 stream_parallelism: usize,
19}
20
21impl<E, P> BackfillJobFactory<E, P> {
22 pub fn new(executor: E, provider: P) -> Self {
24 Self {
25 executor,
26 provider,
27 prune_modes: PruneModes::none(),
28 thresholds: ExecutionStageThresholds {
29 max_duration: Some(Duration::from_secs(30)),
35 ..Default::default()
36 },
37 stream_parallelism: DEFAULT_PARALLELISM,
38 }
39 }
40
41 pub fn with_prune_modes(mut self, prune_modes: PruneModes) -> Self {
43 self.prune_modes = prune_modes;
44 self
45 }
46
47 pub const fn with_thresholds(mut self, thresholds: ExecutionStageThresholds) -> Self {
49 self.thresholds = thresholds;
50 self
51 }
52
53 pub const fn with_stream_parallelism(mut self, stream_parallelism: usize) -> Self {
58 self.stream_parallelism = stream_parallelism;
59 self
60 }
61}
62
63impl<E: Clone, P: Clone> BackfillJobFactory<E, P> {
64 pub fn backfill(&self, range: RangeInclusive<BlockNumber>) -> BackfillJob<E, P> {
66 BackfillJob {
67 executor: self.executor.clone(),
68 provider: self.provider.clone(),
69 prune_modes: self.prune_modes.clone(),
70 range,
71 thresholds: self.thresholds.clone(),
72 stream_parallelism: self.stream_parallelism,
73 }
74 }
75}
76
77impl BackfillJobFactory<(), ()> {
78 pub fn new_from_components<Node: FullNodeComponents>(
80 components: Node,
81 ) -> BackfillJobFactory<Node::Executor, Node::Provider> {
82 BackfillJobFactory::<_, _>::new(
83 components.block_executor().clone(),
84 components.provider().clone(),
85 )
86 }
87}