reth_engine_tree/tree/payload_processor/bal/
ordered_outputs.rs1use super::{worker::BalWorkerOutput, BalExecutionError};
4use alloy_evm::block::BlockExecutionError;
5use crossbeam_channel::Receiver;
6
7pub(super) fn ordered_worker_outputs<R>(
23 result_rx: &Receiver<Result<BalWorkerOutput<R>, BalExecutionError>>,
24 total: usize,
25) -> impl Iterator<Item = Result<BalWorkerOutput<R>, BalExecutionError>> + '_ {
26 OrderedWorkerOutputs::new(result_rx, total)
27}
28
29struct OrderedWorkerOutputs<'a, R> {
30 result_rx: &'a Receiver<Result<BalWorkerOutput<R>, BalExecutionError>>,
31 pending: Vec<Option<BalWorkerOutput<R>>>,
32 next: usize,
33 total: usize,
34 failed: bool,
35}
36
37impl<'a, R> OrderedWorkerOutputs<'a, R> {
38 fn new(
39 result_rx: &'a Receiver<Result<BalWorkerOutput<R>, BalExecutionError>>,
40 total: usize,
41 ) -> Self {
42 Self {
43 result_rx,
44 pending: (0..total).map(|_| None).collect(),
45 next: 0,
46 total,
47 failed: false,
48 }
49 }
50}
51
52impl<R> Iterator for OrderedWorkerOutputs<'_, R> {
53 type Item = Result<BalWorkerOutput<R>, BalExecutionError>;
54
55 fn next(&mut self) -> Option<Self::Item> {
56 if self.failed || self.next >= self.total {
57 return None;
58 }
59
60 loop {
61 if let Some(output) = self.pending[self.next].take() {
62 self.next += 1;
63 return Some(Ok(output));
64 }
65
66 let output = match self.result_rx.recv() {
67 Ok(Ok(output)) => output,
68 Ok(Err(err)) => {
69 self.failed = true;
70 return Some(Err(err));
71 }
72 Err(_) => {
73 self.failed = true;
74 return Some(Err(BalExecutionError::Evm(BlockExecutionError::msg(
75 "BAL worker result channel closed while waiting for ordered outputs",
76 ))));
77 }
78 };
79
80 let index = output.index;
81 if index >= self.total {
82 self.failed = true;
83 return Some(Err(BalExecutionError::Evm(BlockExecutionError::msg(
84 "BAL worker returned out-of-bounds transaction index",
85 ))));
86 }
87
88 if index < self.next || self.pending[index].is_some() {
89 self.failed = true;
90 return Some(Err(BalExecutionError::Evm(BlockExecutionError::msg(
91 "BAL worker returned duplicate transaction index",
92 ))));
93 }
94
95 self.pending[index] = Some(output);
96 }
97 }
98}
99
100#[cfg(test)]
101mod tests {
102 use super::*;
103 use alloy_primitives::Address;
104
105 fn output(index: usize, result: u64) -> BalWorkerOutput<u64> {
106 BalWorkerOutput { index, signer: Address::ZERO, tx_gas_limit: 0, result }
107 }
108
109 fn expect_err_contains<R>(result: Result<BalWorkerOutput<R>, BalExecutionError>, text: &str) {
110 let Err(err) = result else {
111 panic!("expected ordered worker output error");
112 };
113 assert!(err.to_string().contains(text), "expected `{err}` to contain `{text}`");
114 }
115
116 #[test]
117 fn yields_outputs_in_transaction_order() {
118 let (tx, rx) = crossbeam_channel::unbounded();
119 tx.send(Ok(output(2, 20))).unwrap();
120 tx.send(Ok(output(0, 0))).unwrap();
121 tx.send(Ok(output(1, 10))).unwrap();
122 drop(tx);
123
124 let results = ordered_worker_outputs(&rx, 3)
125 .map(|output| output.expect("ordered output").result)
126 .collect::<Vec<_>>();
127
128 assert_eq!(results, vec![0, 10, 20]);
129 }
130
131 #[test]
132 fn forwards_worker_errors_and_then_stops() {
133 let (tx, rx) = crossbeam_channel::unbounded();
134 tx.send(Err(BalExecutionError::Evm(BlockExecutionError::msg("worker failed")))).unwrap();
135 drop(tx);
136
137 let mut outputs = ordered_worker_outputs::<u64>(&rx, 1);
138
139 expect_err_contains(outputs.next().expect("first item"), "worker failed");
140 assert!(outputs.next().is_none());
141 }
142
143 #[test]
144 fn rejects_closed_channel_before_all_outputs_arrive() {
145 let (tx, rx) = crossbeam_channel::unbounded();
146 drop(tx);
147
148 let mut outputs = ordered_worker_outputs::<u64>(&rx, 1);
149
150 expect_err_contains(outputs.next().expect("first item"), "waiting for ordered outputs");
151 assert!(outputs.next().is_none());
152 }
153
154 #[test]
155 fn rejects_out_of_bounds_index() {
156 let (tx, rx) = crossbeam_channel::unbounded();
157 tx.send(Ok(output(1, 10))).unwrap();
158 drop(tx);
159
160 let mut outputs = ordered_worker_outputs(&rx, 1);
161
162 expect_err_contains(outputs.next().expect("first item"), "out-of-bounds");
163 assert!(outputs.next().is_none());
164 }
165
166 #[test]
167 fn rejects_duplicate_pending_index() {
168 let (tx, rx) = crossbeam_channel::unbounded();
169 tx.send(Ok(output(1, 10))).unwrap();
170 tx.send(Ok(output(1, 11))).unwrap();
171 drop(tx);
172
173 let mut outputs = ordered_worker_outputs(&rx, 2);
174
175 expect_err_contains(outputs.next().expect("first item"), "duplicate");
176 assert!(outputs.next().is_none());
177 }
178
179 #[test]
180 fn rejects_duplicate_already_yielded_index() {
181 let (tx, rx) = crossbeam_channel::unbounded();
182 tx.send(Ok(output(0, 0))).unwrap();
183 tx.send(Ok(output(0, 1))).unwrap();
184 drop(tx);
185
186 let mut outputs = ordered_worker_outputs(&rx, 2);
187
188 assert_eq!(outputs.next().expect("first item").expect("first output").result, 0);
189 expect_err_contains(outputs.next().expect("second item"), "duplicate");
190 assert!(outputs.next().is_none());
191 }
192}