reth_stages_api/pipeline/
set.rs

1use crate::{Stage, StageId};
2use std::{
3    collections::HashMap,
4    fmt::{Debug, Formatter},
5};
6
7/// Combines multiple [`Stage`]s into a single unit.
8///
9/// A [`StageSet`] is a logical chunk of stages that depend on each other. It is up to the
10/// individual stage sets to determine what kind of configuration they expose.
11///
12/// Individual stages in the set can be added, removed and overridden using [`StageSetBuilder`].
13pub trait StageSet<Provider>: Sized {
14    /// Configures the stages in the set.
15    fn builder(self) -> StageSetBuilder<Provider>;
16
17    /// Overrides the given [`Stage`], if it is in this set.
18    ///
19    /// # Panics
20    ///
21    /// Panics if the [`Stage`] is not in this set.
22    fn set<S: Stage<Provider> + 'static>(self, stage: S) -> StageSetBuilder<Provider> {
23        self.builder().set(stage)
24    }
25}
26
27struct StageEntry<Provider> {
28    stage: Box<dyn Stage<Provider>>,
29    enabled: bool,
30}
31
32impl<Provider> Debug for StageEntry<Provider> {
33    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
34        f.debug_struct("StageEntry")
35            .field("stage", &self.stage.id())
36            .field("enabled", &self.enabled)
37            .finish()
38    }
39}
40
41/// Helper to create and configure a [`StageSet`].
42///
43/// The builder provides ordering helpers to ensure that stages that depend on each other are added
44/// to the final sync pipeline before/after their dependencies.
45///
46/// Stages inside the set can be disabled, enabled, overridden and reordered.
47pub struct StageSetBuilder<Provider> {
48    stages: HashMap<StageId, StageEntry<Provider>>,
49    order: Vec<StageId>,
50}
51
52impl<Provider> Default for StageSetBuilder<Provider> {
53    fn default() -> Self {
54        Self { stages: HashMap::default(), order: Vec::new() }
55    }
56}
57
58impl<Provider> Debug for StageSetBuilder<Provider> {
59    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
60        f.debug_struct("StageSetBuilder")
61            .field("stages", &self.stages)
62            .field("order", &self.order)
63            .finish()
64    }
65}
66
67impl<Provider> StageSetBuilder<Provider> {
68    fn index_of(&self, stage_id: StageId) -> usize {
69        let index = self.order.iter().position(|&id| id == stage_id);
70
71        index.unwrap_or_else(|| panic!("Stage does not exist in set: {stage_id}"))
72    }
73
74    fn upsert_stage_state(&mut self, stage: Box<dyn Stage<Provider>>, added_at_index: usize) {
75        let stage_id = stage.id();
76        if self.stages.insert(stage.id(), StageEntry { stage, enabled: true }).is_some() &&
77            let Some(to_remove) = self
78                .order
79                .iter()
80                .enumerate()
81                .find(|(i, id)| *i != added_at_index && **id == stage_id)
82                .map(|(i, _)| i)
83        {
84            self.order.remove(to_remove);
85        }
86    }
87
88    /// Overrides the given [`Stage`], if it is in this set.
89    ///
90    /// # Panics
91    ///
92    /// Panics if the [`Stage`] is not in this set.
93    pub fn set<S: Stage<Provider> + 'static>(mut self, stage: S) -> Self {
94        let entry = self
95            .stages
96            .get_mut(&stage.id())
97            .unwrap_or_else(|| panic!("Stage does not exist in set: {}", stage.id()));
98        entry.stage = Box::new(stage);
99        self
100    }
101
102    /// Returns iterator over the stages in this set,
103    /// In the same order they would be executed in the pipeline.
104    pub fn stages(&self) -> impl Iterator<Item = StageId> + '_ {
105        self.order.iter().copied()
106    }
107
108    /// Replaces a stage with the given ID with a new stage.
109    ///
110    /// If the new stage has a different ID,
111    /// it will maintain the original stage's position in the execution order.
112    pub fn replace<S: Stage<Provider> + 'static>(mut self, stage_id: StageId, stage: S) -> Self {
113        self.stages
114            .get(&stage_id)
115            .unwrap_or_else(|| panic!("Stage does not exist in set: {stage_id}"));
116
117        if stage.id() == stage_id {
118            return self.set(stage);
119        }
120        let index = self.index_of(stage_id);
121        self.stages.remove(&stage_id);
122        self.order[index] = stage.id();
123        self.upsert_stage_state(Box::new(stage), index);
124        self
125    }
126
127    /// Adds the given [`Stage`] at the end of this set.
128    ///
129    /// If the stage was already in the group, it is removed from its previous place.
130    pub fn add_stage<S: Stage<Provider> + 'static>(mut self, stage: S) -> Self {
131        let target_index = self.order.len();
132        self.order.push(stage.id());
133        self.upsert_stage_state(Box::new(stage), target_index);
134        self
135    }
136
137    /// Adds the given [`Stage`] at the end of this set if it's [`Some`].
138    ///
139    /// If the stage was already in the group, it is removed from its previous place.
140    pub fn add_stage_opt<S: Stage<Provider> + 'static>(self, stage: Option<S>) -> Self {
141        if let Some(stage) = stage {
142            self.add_stage(stage)
143        } else {
144            self
145        }
146    }
147
148    /// Adds the given [`StageSet`] to the end of this set.
149    ///
150    /// If a stage is in both sets, it is removed from its previous place in this set. Because of
151    /// this, it is advisable to merge sets first and re-order stages after if needed.
152    pub fn add_set<Set: StageSet<Provider>>(mut self, set: Set) -> Self {
153        for stage in set.builder().build() {
154            let target_index = self.order.len();
155            self.order.push(stage.id());
156            self.upsert_stage_state(stage, target_index);
157        }
158        self
159    }
160
161    /// Adds the given [`Stage`] before the stage with the given [`StageId`].
162    ///
163    /// If the stage was already in the group, it is removed from its previous place.
164    ///
165    /// # Panics
166    ///
167    /// Panics if the dependency stage is not in this set.
168    pub fn add_before<S: Stage<Provider> + 'static>(mut self, stage: S, before: StageId) -> Self {
169        let target_index = self.index_of(before);
170        self.order.insert(target_index, stage.id());
171        self.upsert_stage_state(Box::new(stage), target_index);
172        self
173    }
174
175    /// Adds the given [`Stage`] after the stage with the given [`StageId`].
176    ///
177    /// If the stage was already in the group, it is removed from its previous place.
178    ///
179    /// # Panics
180    ///
181    /// Panics if the dependency stage is not in this set.
182    pub fn add_after<S: Stage<Provider> + 'static>(mut self, stage: S, after: StageId) -> Self {
183        let target_index = self.index_of(after) + 1;
184        self.order.insert(target_index, stage.id());
185        self.upsert_stage_state(Box::new(stage), target_index);
186        self
187    }
188
189    /// Enables the given stage.
190    ///
191    /// All stages within a [`StageSet`] are enabled by default.
192    ///
193    /// # Panics
194    ///
195    /// Panics if the stage is not in this set.
196    pub fn enable(mut self, stage_id: StageId) -> Self {
197        let entry =
198            self.stages.get_mut(&stage_id).expect("Cannot enable a stage that is not in the set.");
199        entry.enabled = true;
200        self
201    }
202
203    /// Disables the given stage.
204    ///
205    /// The disabled [`Stage`] keeps its place in the set, so it can be used for ordering with
206    /// [`StageSetBuilder::add_before`] or [`StageSetBuilder::add_after`], or it can be re-enabled.
207    ///
208    /// All stages within a [`StageSet`] are enabled by default.
209    ///
210    /// # Panics
211    ///
212    /// Panics if the stage is not in this set.
213    #[track_caller]
214    pub fn disable(mut self, stage_id: StageId) -> Self {
215        let entry = self
216            .stages
217            .get_mut(&stage_id)
218            .unwrap_or_else(|| panic!("Cannot disable a stage that is not in the set: {stage_id}"));
219        entry.enabled = false;
220        self
221    }
222
223    /// Disables all given stages. See [`disable`](Self::disable).
224    ///
225    /// If any of the stages is not in this set, it is ignored.
226    pub fn disable_all(mut self, stages: &[StageId]) -> Self {
227        for stage_id in stages {
228            let Some(entry) = self.stages.get_mut(stage_id) else { continue };
229            entry.enabled = false;
230        }
231        self
232    }
233
234    /// Disables the given stage if the given closure returns true.
235    ///
236    /// See [`Self::disable`]
237    #[track_caller]
238    pub fn disable_if<F>(self, stage_id: StageId, f: F) -> Self
239    where
240        F: FnOnce() -> bool,
241    {
242        if f() {
243            return self.disable(stage_id)
244        }
245        self
246    }
247
248    /// Disables all given stages if the given closure returns true.
249    ///
250    /// See [`Self::disable`]
251    #[track_caller]
252    pub fn disable_all_if<F>(self, stages: &[StageId], f: F) -> Self
253    where
254        F: FnOnce() -> bool,
255    {
256        if f() {
257            return self.disable_all(stages)
258        }
259        self
260    }
261
262    /// Consumes the builder and returns the contained [`Stage`]s in the order specified.
263    pub fn build(mut self) -> Vec<Box<dyn Stage<Provider>>> {
264        let mut stages = Vec::new();
265        for id in &self.order {
266            if let Some(entry) = self.stages.remove(id) &&
267                entry.enabled
268            {
269                stages.push(entry.stage);
270            }
271        }
272        stages
273    }
274}
275
276impl<Provider> StageSet<Provider> for StageSetBuilder<Provider> {
277    fn builder(self) -> Self {
278        self
279    }
280}