Skip to main content

reth_engine_tree/tree/payload_processor/bal/
ordered_outputs.rs

1//! Ordering adapter for speculative BAL worker results.
2
3use super::{
4    worker::{BalWorkerError, BalWorkerOutput},
5    BalExecutionError,
6};
7use crossbeam_channel::Receiver;
8
9#[derive(Debug, thiserror::Error)]
10pub(super) enum OrderedWorkerOutputError {
11    /// A worker returned an error.
12    #[error(transparent)]
13    Worker(#[from] BalWorkerError),
14    /// The result channel closed before every transaction had an output.
15    #[error("BAL worker result channel closed while waiting for ordered outputs")]
16    ResultChannelClosed,
17}
18
19impl From<OrderedWorkerOutputError> for BalExecutionError {
20    fn from(err: OrderedWorkerOutputError) -> Self {
21        match err {
22            OrderedWorkerOutputError::Worker(err) => err.into(),
23            other => Self::other(other),
24        }
25    }
26}
27
28/// Returns a blocking iterator over worker outputs in transaction order.
29///
30/// Workers may finish transactions out of order. This adapter buffers future-indexed outputs and
31/// yields each output only when every earlier transaction has been yielded. It owns no execution
32/// state and performs no canonical commit work.
33///
34/// Behavior:
35/// - each yielded `Ok` output has the next transaction index
36/// - worker errors are forwarded unchanged
37/// - closed channels before `total` outputs yield `Err`
38/// - out-of-bounds and duplicate indices panic because they violate the internal worker/dispatcher
39///   index invariant
40/// - after the first error, the iterator is exhausted
41pub(super) fn ordered_worker_outputs<R>(
42    result_rx: &Receiver<Result<BalWorkerOutput<R>, BalWorkerError>>,
43    total: usize,
44) -> impl Iterator<Item = Result<BalWorkerOutput<R>, OrderedWorkerOutputError>> + '_ {
45    OrderedWorkerOutputs::new(result_rx, total)
46}
47
48struct OrderedWorkerOutputs<'a, R> {
49    result_rx: &'a Receiver<Result<BalWorkerOutput<R>, BalWorkerError>>,
50    pending: Vec<Option<BalWorkerOutput<R>>>,
51    next: usize,
52    total: usize,
53    failed: bool,
54}
55
56impl<'a, R> OrderedWorkerOutputs<'a, R> {
57    fn new(
58        result_rx: &'a Receiver<Result<BalWorkerOutput<R>, BalWorkerError>>,
59        total: usize,
60    ) -> Self {
61        Self {
62            result_rx,
63            pending: (0..total).map(|_| None).collect(),
64            next: 0,
65            total,
66            failed: false,
67        }
68    }
69}
70
71impl<R> Iterator for OrderedWorkerOutputs<'_, R> {
72    type Item = Result<BalWorkerOutput<R>, OrderedWorkerOutputError>;
73
74    fn next(&mut self) -> Option<Self::Item> {
75        if self.failed || self.next >= self.total {
76            return None;
77        }
78
79        loop {
80            if let Some(output) = self.pending[self.next].take() {
81                self.next += 1;
82                return Some(Ok(output));
83            }
84
85            let output = match self.result_rx.recv() {
86                Ok(Ok(output)) => output,
87                Ok(Err(err)) => {
88                    self.failed = true;
89                    return Some(Err(err.into()));
90                }
91                Err(_) => {
92                    self.failed = true;
93                    return Some(Err(OrderedWorkerOutputError::ResultChannelClosed));
94                }
95            };
96
97            let index = output.index;
98            assert!(
99                index < self.total,
100                "BAL worker returned out-of-bounds transaction index {index}; total={}",
101                self.total
102            );
103            assert!(
104                index >= self.next && self.pending[index].is_none(),
105                "BAL worker returned duplicate transaction index {index}",
106            );
107
108            self.pending[index] = Some(output);
109        }
110    }
111}
112
113#[cfg(test)]
114mod tests {
115    use super::*;
116    use crate::tree::payload_processor::bal::BalExecutionError;
117    use alloy_primitives::Address;
118
119    fn output(index: usize, result: u64) -> BalWorkerOutput<u64> {
120        BalWorkerOutput { index, signer: Address::ZERO, tx_gas_limit: 0, result }
121    }
122
123    fn expect_err_contains<R>(
124        result: Result<BalWorkerOutput<R>, OrderedWorkerOutputError>,
125        text: &str,
126    ) {
127        let Err(err) = result else {
128            panic!("expected ordered worker output error");
129        };
130        assert!(err.to_string().contains(text), "expected `{err}` to contain `{text}`");
131    }
132
133    #[test]
134    fn yields_outputs_in_transaction_order() {
135        let (tx, rx) = crossbeam_channel::unbounded();
136        tx.send(Ok(output(2, 20))).unwrap();
137        tx.send(Ok(output(0, 0))).unwrap();
138        tx.send(Ok(output(1, 10))).unwrap();
139        drop(tx);
140
141        let results = ordered_worker_outputs(&rx, 3)
142            .map(|output| output.expect("ordered output").result)
143            .collect::<Vec<_>>();
144
145        assert_eq!(results, vec![0, 10, 20]);
146    }
147
148    #[test]
149    fn forwards_worker_errors_and_then_stops() {
150        let (tx, rx) = crossbeam_channel::unbounded();
151        tx.send(Err(BalWorkerError::Setup(BalExecutionError::Execution(
152            alloy_evm::block::BlockExecutionError::msg("worker failed"),
153        ))))
154        .unwrap();
155        drop(tx);
156
157        let mut outputs = ordered_worker_outputs::<u64>(&rx, 1);
158
159        expect_err_contains(outputs.next().expect("first item"), "worker failed");
160        assert!(outputs.next().is_none());
161    }
162
163    #[test]
164    fn rejects_closed_channel_before_all_outputs_arrive() {
165        let (tx, rx) = crossbeam_channel::unbounded();
166        drop(tx);
167
168        let mut outputs = ordered_worker_outputs::<u64>(&rx, 1);
169
170        expect_err_contains(outputs.next().expect("first item"), "waiting for ordered outputs");
171        assert!(outputs.next().is_none());
172    }
173
174    #[test]
175    #[should_panic(expected = "out-of-bounds transaction index")]
176    fn panics_on_out_of_bounds_index() {
177        let (tx, rx) = crossbeam_channel::unbounded();
178        tx.send(Ok(output(1, 10))).unwrap();
179        drop(tx);
180
181        let mut outputs = ordered_worker_outputs(&rx, 1);
182        let _ = outputs.next();
183    }
184
185    #[test]
186    #[should_panic(expected = "duplicate transaction index")]
187    fn panics_on_duplicate_pending_index() {
188        let (tx, rx) = crossbeam_channel::unbounded();
189        tx.send(Ok(output(1, 10))).unwrap();
190        tx.send(Ok(output(1, 11))).unwrap();
191        drop(tx);
192
193        let mut outputs = ordered_worker_outputs(&rx, 2);
194        let _ = outputs.next();
195    }
196
197    #[test]
198    #[should_panic(expected = "duplicate transaction index")]
199    fn panics_on_duplicate_already_yielded_index() {
200        let (tx, rx) = crossbeam_channel::unbounded();
201        tx.send(Ok(output(0, 0))).unwrap();
202        tx.send(Ok(output(0, 1))).unwrap();
203        drop(tx);
204
205        let mut outputs = ordered_worker_outputs(&rx, 2);
206
207        assert_eq!(outputs.next().expect("first item").expect("first output").result, 0);
208        let _ = outputs.next();
209    }
210}