reth_exex/backfill/
factory.rs

1use crate::BackfillJob;
2use std::{ops::RangeInclusive, time::Duration};
3
4use alloy_primitives::BlockNumber;
5use reth_node_api::FullNodeComponents;
6use reth_prune_types::PruneModes;
7use reth_stages_api::ExecutionStageThresholds;
8
9use super::stream::DEFAULT_PARALLELISM;
10
11/// Factory for creating new backfill jobs.
12#[derive(Debug, Clone)]
13pub struct BackfillJobFactory<E, P> {
14    executor: E,
15    provider: P,
16    prune_modes: PruneModes,
17    thresholds: ExecutionStageThresholds,
18    stream_parallelism: usize,
19}
20
21impl<E, P> BackfillJobFactory<E, P> {
22    /// Creates a new [`BackfillJobFactory`].
23    pub fn new(executor: E, provider: P) -> Self {
24        Self {
25            executor,
26            provider,
27            prune_modes: PruneModes::none(),
28            thresholds: ExecutionStageThresholds {
29                // Default duration for a database transaction to be considered long-lived is
30                // 60 seconds, so we limit the backfill job to the half of it to be sure we finish
31                // before the warning is logged.
32                //
33                // See `reth_db::implementation::mdbx::tx::LONG_TRANSACTION_DURATION`.
34                max_duration: Some(Duration::from_secs(30)),
35                ..Default::default()
36            },
37            stream_parallelism: DEFAULT_PARALLELISM,
38        }
39    }
40
41    /// Sets the prune modes
42    pub fn with_prune_modes(mut self, prune_modes: PruneModes) -> Self {
43        self.prune_modes = prune_modes;
44        self
45    }
46
47    /// Sets the thresholds
48    pub const fn with_thresholds(mut self, thresholds: ExecutionStageThresholds) -> Self {
49        self.thresholds = thresholds;
50        self
51    }
52
53    /// Sets the stream parallelism.
54    ///
55    /// Configures the [`StreamBackfillJob`](super::stream::StreamBackfillJob) created via
56    /// [`BackfillJob::into_stream`].
57    pub const fn with_stream_parallelism(mut self, stream_parallelism: usize) -> Self {
58        self.stream_parallelism = stream_parallelism;
59        self
60    }
61}
62
63impl<E: Clone, P: Clone> BackfillJobFactory<E, P> {
64    /// Creates a new backfill job for the given range.
65    pub fn backfill(&self, range: RangeInclusive<BlockNumber>) -> BackfillJob<E, P> {
66        BackfillJob {
67            executor: self.executor.clone(),
68            provider: self.provider.clone(),
69            prune_modes: self.prune_modes.clone(),
70            range,
71            thresholds: self.thresholds.clone(),
72            stream_parallelism: self.stream_parallelism,
73        }
74    }
75}
76
77impl BackfillJobFactory<(), ()> {
78    /// Creates a new [`BackfillJobFactory`] from [`FullNodeComponents`].
79    pub fn new_from_components<Node: FullNodeComponents>(
80        components: Node,
81    ) -> BackfillJobFactory<Node::Executor, Node::Provider> {
82        BackfillJobFactory::<_, _>::new(
83            components.block_executor().clone(),
84            components.provider().clone(),
85        )
86    }
87}