reth_engine_tree/tree/payload_processor/bal/
ordered_outputs.rs1use super::{
4 worker::{BalWorkerError, BalWorkerOutput},
5 BalExecutionError,
6};
7use crossbeam_channel::Receiver;
8
9#[derive(Debug, thiserror::Error)]
10pub(super) enum OrderedWorkerOutputError {
11 #[error(transparent)]
13 Worker(#[from] BalWorkerError),
14 #[error("BAL worker result channel closed while waiting for ordered outputs")]
16 ResultChannelClosed,
17}
18
19impl From<OrderedWorkerOutputError> for BalExecutionError {
20 fn from(err: OrderedWorkerOutputError) -> Self {
21 match err {
22 OrderedWorkerOutputError::Worker(err) => err.into(),
23 other => Self::other(other),
24 }
25 }
26}
27
28pub(super) fn ordered_worker_outputs<R>(
42 result_rx: &Receiver<Result<BalWorkerOutput<R>, BalWorkerError>>,
43 total: usize,
44) -> impl Iterator<Item = Result<BalWorkerOutput<R>, OrderedWorkerOutputError>> + '_ {
45 OrderedWorkerOutputs::new(result_rx, total)
46}
47
48struct OrderedWorkerOutputs<'a, R> {
49 result_rx: &'a Receiver<Result<BalWorkerOutput<R>, BalWorkerError>>,
50 pending: Vec<Option<BalWorkerOutput<R>>>,
51 next: usize,
52 total: usize,
53 failed: bool,
54}
55
56impl<'a, R> OrderedWorkerOutputs<'a, R> {
57 fn new(
58 result_rx: &'a Receiver<Result<BalWorkerOutput<R>, BalWorkerError>>,
59 total: usize,
60 ) -> Self {
61 Self {
62 result_rx,
63 pending: (0..total).map(|_| None).collect(),
64 next: 0,
65 total,
66 failed: false,
67 }
68 }
69}
70
71impl<R> Iterator for OrderedWorkerOutputs<'_, R> {
72 type Item = Result<BalWorkerOutput<R>, OrderedWorkerOutputError>;
73
74 fn next(&mut self) -> Option<Self::Item> {
75 if self.failed || self.next >= self.total {
76 return None;
77 }
78
79 loop {
80 if let Some(output) = self.pending[self.next].take() {
81 self.next += 1;
82 return Some(Ok(output));
83 }
84
85 let output = match self.result_rx.recv() {
86 Ok(Ok(output)) => output,
87 Ok(Err(err)) => {
88 self.failed = true;
89 return Some(Err(err.into()));
90 }
91 Err(_) => {
92 self.failed = true;
93 return Some(Err(OrderedWorkerOutputError::ResultChannelClosed));
94 }
95 };
96
97 let index = output.index;
98 assert!(
99 index < self.total,
100 "BAL worker returned out-of-bounds transaction index {index}; total={}",
101 self.total
102 );
103 assert!(
104 index >= self.next && self.pending[index].is_none(),
105 "BAL worker returned duplicate transaction index {index}",
106 );
107
108 self.pending[index] = Some(output);
109 }
110 }
111}
112
113#[cfg(test)]
114mod tests {
115 use super::*;
116 use crate::tree::payload_processor::bal::BalExecutionError;
117 use alloy_primitives::Address;
118
119 fn output(index: usize, result: u64) -> BalWorkerOutput<u64> {
120 BalWorkerOutput { index, signer: Address::ZERO, tx_gas_limit: 0, result }
121 }
122
123 fn expect_err_contains<R>(
124 result: Result<BalWorkerOutput<R>, OrderedWorkerOutputError>,
125 text: &str,
126 ) {
127 let Err(err) = result else {
128 panic!("expected ordered worker output error");
129 };
130 assert!(err.to_string().contains(text), "expected `{err}` to contain `{text}`");
131 }
132
133 #[test]
134 fn yields_outputs_in_transaction_order() {
135 let (tx, rx) = crossbeam_channel::unbounded();
136 tx.send(Ok(output(2, 20))).unwrap();
137 tx.send(Ok(output(0, 0))).unwrap();
138 tx.send(Ok(output(1, 10))).unwrap();
139 drop(tx);
140
141 let results = ordered_worker_outputs(&rx, 3)
142 .map(|output| output.expect("ordered output").result)
143 .collect::<Vec<_>>();
144
145 assert_eq!(results, vec![0, 10, 20]);
146 }
147
148 #[test]
149 fn forwards_worker_errors_and_then_stops() {
150 let (tx, rx) = crossbeam_channel::unbounded();
151 tx.send(Err(BalWorkerError::Setup(BalExecutionError::Execution(
152 alloy_evm::block::BlockExecutionError::msg("worker failed"),
153 ))))
154 .unwrap();
155 drop(tx);
156
157 let mut outputs = ordered_worker_outputs::<u64>(&rx, 1);
158
159 expect_err_contains(outputs.next().expect("first item"), "worker failed");
160 assert!(outputs.next().is_none());
161 }
162
163 #[test]
164 fn rejects_closed_channel_before_all_outputs_arrive() {
165 let (tx, rx) = crossbeam_channel::unbounded();
166 drop(tx);
167
168 let mut outputs = ordered_worker_outputs::<u64>(&rx, 1);
169
170 expect_err_contains(outputs.next().expect("first item"), "waiting for ordered outputs");
171 assert!(outputs.next().is_none());
172 }
173
174 #[test]
175 #[should_panic(expected = "out-of-bounds transaction index")]
176 fn panics_on_out_of_bounds_index() {
177 let (tx, rx) = crossbeam_channel::unbounded();
178 tx.send(Ok(output(1, 10))).unwrap();
179 drop(tx);
180
181 let mut outputs = ordered_worker_outputs(&rx, 1);
182 let _ = outputs.next();
183 }
184
185 #[test]
186 #[should_panic(expected = "duplicate transaction index")]
187 fn panics_on_duplicate_pending_index() {
188 let (tx, rx) = crossbeam_channel::unbounded();
189 tx.send(Ok(output(1, 10))).unwrap();
190 tx.send(Ok(output(1, 11))).unwrap();
191 drop(tx);
192
193 let mut outputs = ordered_worker_outputs(&rx, 2);
194 let _ = outputs.next();
195 }
196
197 #[test]
198 #[should_panic(expected = "duplicate transaction index")]
199 fn panics_on_duplicate_already_yielded_index() {
200 let (tx, rx) = crossbeam_channel::unbounded();
201 tx.send(Ok(output(0, 0))).unwrap();
202 tx.send(Ok(output(0, 1))).unwrap();
203 drop(tx);
204
205 let mut outputs = ordered_worker_outputs(&rx, 2);
206
207 assert_eq!(outputs.next().expect("first item").expect("first output").result, 0);
208 let _ = outputs.next();
209 }
210}