Skip to main content

reth_engine_tree/tree/payload_processor/bal/
ordered_outputs.rs

1//! Ordering adapter for speculative BAL worker results.
2
3use super::{worker::BalWorkerOutput, BalExecutionError};
4use alloy_evm::block::BlockExecutionError;
5use crossbeam_channel::Receiver;
6
7/// Returns a blocking iterator over worker outputs in transaction order.
8///
9/// Workers may finish transactions out of order. This adapter buffers future-indexed outputs and
10/// yields each output only when every earlier transaction has been yielded. It owns no execution
11/// state and performs no canonical commit work.
12///
13/// Contract for callers:
14/// - each yielded `Ok` output has the next transaction index
15/// - worker errors are forwarded unchanged
16/// - closed channels before `total` outputs, out-of-bounds indices, and duplicate indices yield
17///   `Err`
18/// - after the first error, the iterator is exhausted
19///
20/// Callers that intentionally abort workers must stop polling this iterator after initiating
21/// abort. Before `total` outputs are yielded, channel closure is treated as an execution error.
22pub(super) fn ordered_worker_outputs<R>(
23    result_rx: &Receiver<Result<BalWorkerOutput<R>, BalExecutionError>>,
24    total: usize,
25) -> impl Iterator<Item = Result<BalWorkerOutput<R>, BalExecutionError>> + '_ {
26    OrderedWorkerOutputs::new(result_rx, total)
27}
28
29struct OrderedWorkerOutputs<'a, R> {
30    result_rx: &'a Receiver<Result<BalWorkerOutput<R>, BalExecutionError>>,
31    pending: Vec<Option<BalWorkerOutput<R>>>,
32    next: usize,
33    total: usize,
34    failed: bool,
35}
36
37impl<'a, R> OrderedWorkerOutputs<'a, R> {
38    fn new(
39        result_rx: &'a Receiver<Result<BalWorkerOutput<R>, BalExecutionError>>,
40        total: usize,
41    ) -> Self {
42        Self {
43            result_rx,
44            pending: (0..total).map(|_| None).collect(),
45            next: 0,
46            total,
47            failed: false,
48        }
49    }
50}
51
52impl<R> Iterator for OrderedWorkerOutputs<'_, R> {
53    type Item = Result<BalWorkerOutput<R>, BalExecutionError>;
54
55    fn next(&mut self) -> Option<Self::Item> {
56        if self.failed || self.next >= self.total {
57            return None;
58        }
59
60        loop {
61            if let Some(output) = self.pending[self.next].take() {
62                self.next += 1;
63                return Some(Ok(output));
64            }
65
66            let output = match self.result_rx.recv() {
67                Ok(Ok(output)) => output,
68                Ok(Err(err)) => {
69                    self.failed = true;
70                    return Some(Err(err));
71                }
72                Err(_) => {
73                    self.failed = true;
74                    return Some(Err(BalExecutionError::Evm(BlockExecutionError::msg(
75                        "BAL worker result channel closed while waiting for ordered outputs",
76                    ))));
77                }
78            };
79
80            let index = output.index;
81            if index >= self.total {
82                self.failed = true;
83                return Some(Err(BalExecutionError::Evm(BlockExecutionError::msg(
84                    "BAL worker returned out-of-bounds transaction index",
85                ))));
86            }
87
88            if index < self.next || self.pending[index].is_some() {
89                self.failed = true;
90                return Some(Err(BalExecutionError::Evm(BlockExecutionError::msg(
91                    "BAL worker returned duplicate transaction index",
92                ))));
93            }
94
95            self.pending[index] = Some(output);
96        }
97    }
98}
99
100#[cfg(test)]
101mod tests {
102    use super::*;
103    use alloy_primitives::Address;
104
105    fn output(index: usize, result: u64) -> BalWorkerOutput<u64> {
106        BalWorkerOutput { index, signer: Address::ZERO, tx_gas_limit: 0, result }
107    }
108
109    fn expect_err_contains<R>(result: Result<BalWorkerOutput<R>, BalExecutionError>, text: &str) {
110        let Err(err) = result else {
111            panic!("expected ordered worker output error");
112        };
113        assert!(err.to_string().contains(text), "expected `{err}` to contain `{text}`");
114    }
115
116    #[test]
117    fn yields_outputs_in_transaction_order() {
118        let (tx, rx) = crossbeam_channel::unbounded();
119        tx.send(Ok(output(2, 20))).unwrap();
120        tx.send(Ok(output(0, 0))).unwrap();
121        tx.send(Ok(output(1, 10))).unwrap();
122        drop(tx);
123
124        let results = ordered_worker_outputs(&rx, 3)
125            .map(|output| output.expect("ordered output").result)
126            .collect::<Vec<_>>();
127
128        assert_eq!(results, vec![0, 10, 20]);
129    }
130
131    #[test]
132    fn forwards_worker_errors_and_then_stops() {
133        let (tx, rx) = crossbeam_channel::unbounded();
134        tx.send(Err(BalExecutionError::Evm(BlockExecutionError::msg("worker failed")))).unwrap();
135        drop(tx);
136
137        let mut outputs = ordered_worker_outputs::<u64>(&rx, 1);
138
139        expect_err_contains(outputs.next().expect("first item"), "worker failed");
140        assert!(outputs.next().is_none());
141    }
142
143    #[test]
144    fn rejects_closed_channel_before_all_outputs_arrive() {
145        let (tx, rx) = crossbeam_channel::unbounded();
146        drop(tx);
147
148        let mut outputs = ordered_worker_outputs::<u64>(&rx, 1);
149
150        expect_err_contains(outputs.next().expect("first item"), "waiting for ordered outputs");
151        assert!(outputs.next().is_none());
152    }
153
154    #[test]
155    fn rejects_out_of_bounds_index() {
156        let (tx, rx) = crossbeam_channel::unbounded();
157        tx.send(Ok(output(1, 10))).unwrap();
158        drop(tx);
159
160        let mut outputs = ordered_worker_outputs(&rx, 1);
161
162        expect_err_contains(outputs.next().expect("first item"), "out-of-bounds");
163        assert!(outputs.next().is_none());
164    }
165
166    #[test]
167    fn rejects_duplicate_pending_index() {
168        let (tx, rx) = crossbeam_channel::unbounded();
169        tx.send(Ok(output(1, 10))).unwrap();
170        tx.send(Ok(output(1, 11))).unwrap();
171        drop(tx);
172
173        let mut outputs = ordered_worker_outputs(&rx, 2);
174
175        expect_err_contains(outputs.next().expect("first item"), "duplicate");
176        assert!(outputs.next().is_none());
177    }
178
179    #[test]
180    fn rejects_duplicate_already_yielded_index() {
181        let (tx, rx) = crossbeam_channel::unbounded();
182        tx.send(Ok(output(0, 0))).unwrap();
183        tx.send(Ok(output(0, 1))).unwrap();
184        drop(tx);
185
186        let mut outputs = ordered_worker_outputs(&rx, 2);
187
188        assert_eq!(outputs.next().expect("first item").expect("first output").result, 0);
189        expect_err_contains(outputs.next().expect("second item"), "duplicate");
190        assert!(outputs.next().is_none());
191    }
192}