reth_engine_tree/tree/payload_processor/
receipt_root_task.rs1use alloy_eips::Encodable2718;
10use alloy_primitives::{Bloom, B256};
11use crossbeam_channel::Receiver;
12use reth_primitives_traits::Receipt;
13use reth_trie_common::ordered_root::OrderedTrieRootEncodedBuilder;
14use tokio::sync::oneshot;
15
16#[derive(Debug, Clone)]
18pub struct IndexedReceipt<R> {
19 pub index: usize,
21 pub receipt: R,
23}
24
25impl<R> IndexedReceipt<R> {
26 #[inline]
28 pub const fn new(index: usize, receipt: R) -> Self {
29 Self { index, receipt }
30 }
31}
32
33#[derive(Debug)]
38pub struct ReceiptRootTaskHandle<R> {
39 receipt_rx: Receiver<IndexedReceipt<R>>,
41 result_tx: oneshot::Sender<(B256, Bloom)>,
43}
44
45impl<R: Receipt> ReceiptRootTaskHandle<R> {
46 pub const fn new(
48 receipt_rx: Receiver<IndexedReceipt<R>>,
49 result_tx: oneshot::Sender<(B256, Bloom)>,
50 ) -> Self {
51 Self { receipt_rx, result_tx }
52 }
53
54 pub fn run(self, receipts_len: usize) {
68 let mut builder = OrderedTrieRootEncodedBuilder::new(receipts_len);
69 let mut aggregated_bloom = Bloom::ZERO;
70 let mut encode_buf = Vec::new();
71 let mut received_count = 0usize;
72
73 for indexed_receipt in self.receipt_rx {
74 let receipt_with_bloom = indexed_receipt.receipt.with_bloom_ref();
75
76 encode_buf.clear();
77 receipt_with_bloom.encode_2718(&mut encode_buf);
78
79 aggregated_bloom |= *receipt_with_bloom.bloom_ref();
80 match builder.push(indexed_receipt.index, &encode_buf) {
81 Ok(()) => {
82 received_count += 1;
83 }
84 Err(err) => {
85 tracing::error!(
89 target: "engine::tree::payload_processor",
90 index = indexed_receipt.index,
91 ?err,
92 "Receipt root task received invalid receipt index, skipping"
93 );
94 }
95 }
96 }
97
98 let Ok(root) = builder.finalize() else {
99 tracing::error!(
103 target: "engine::tree::payload_processor",
104 expected = receipts_len,
105 received = received_count,
106 "Receipt root task received incomplete receipts, execution likely aborted"
107 );
108 return;
109 };
110 let _ = self.result_tx.send((root, aggregated_bloom));
111 }
112}
113
114#[cfg(test)]
115mod tests {
116 use super::*;
117 use alloy_consensus::{proofs::calculate_receipt_root, TxReceipt};
118 use alloy_primitives::{b256, hex, Address, Bytes, Log};
119 use crossbeam_channel::bounded;
120 use reth_ethereum_primitives::{Receipt, TxType};
121
122 #[tokio::test]
123 async fn test_receipt_root_task_empty() {
124 let (_tx, rx) = bounded::<IndexedReceipt<Receipt>>(1);
125 let (result_tx, result_rx) = oneshot::channel();
126 drop(_tx);
127
128 let handle = ReceiptRootTaskHandle::new(rx, result_tx);
129 tokio::task::spawn_blocking(move || handle.run(0)).await.unwrap();
130
131 let (root, bloom) = result_rx.await.unwrap();
132
133 assert_eq!(root, reth_trie_common::EMPTY_ROOT_HASH);
135 assert_eq!(bloom, Bloom::ZERO);
136 }
137
138 #[tokio::test]
139 async fn test_receipt_root_task_single_receipt() {
140 let receipts: Vec<Receipt> = vec![Receipt::default()];
141
142 let (tx, rx) = bounded(1);
143 let (result_tx, result_rx) = oneshot::channel();
144 let receipts_len = receipts.len();
145
146 let handle = ReceiptRootTaskHandle::new(rx, result_tx);
147 let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len));
148
149 for (i, receipt) in receipts.clone().into_iter().enumerate() {
150 tx.send(IndexedReceipt::new(i, receipt)).unwrap();
151 }
152 drop(tx);
153
154 join_handle.await.unwrap();
155 let (root, _bloom) = result_rx.await.unwrap();
156
157 let receipts_with_bloom: Vec<_> = receipts.iter().map(|r| r.with_bloom_ref()).collect();
159 let expected_root = calculate_receipt_root(&receipts_with_bloom);
160
161 assert_eq!(root, expected_root);
162 }
163
164 #[tokio::test]
165 async fn test_receipt_root_task_multiple_receipts() {
166 let receipts: Vec<Receipt> = vec![Receipt::default(); 5];
167
168 let (tx, rx) = bounded(4);
169 let (result_tx, result_rx) = oneshot::channel();
170 let receipts_len = receipts.len();
171
172 let handle = ReceiptRootTaskHandle::new(rx, result_tx);
173 let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len));
174
175 for (i, receipt) in receipts.into_iter().enumerate() {
176 tx.send(IndexedReceipt::new(i, receipt)).unwrap();
177 }
178 drop(tx);
179
180 join_handle.await.unwrap();
181 let (root, bloom) = result_rx.await.unwrap();
182
183 assert_eq!(
185 root,
186 b256!("0x61353b4fb714dc1fccacbf7eafc4273e62f3d1eed716fe41b2a0cd2e12c63ebc")
187 );
188 assert_eq!(
189 bloom,
190 Bloom::from(hex!("00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"))
191 );
192 }
193
194 #[tokio::test]
195 async fn test_receipt_root_matches_standard_calculation() {
196 let receipts = vec![
198 Receipt {
199 tx_type: TxType::Legacy,
200 cumulative_gas_used: 21000,
201 success: true,
202 logs: vec![],
203 },
204 Receipt {
205 tx_type: TxType::Eip1559,
206 cumulative_gas_used: 42000,
207 success: true,
208 logs: vec![Log {
209 address: Address::ZERO,
210 data: alloy_primitives::LogData::new_unchecked(vec![B256::ZERO], Bytes::new()),
211 }],
212 },
213 Receipt {
214 tx_type: TxType::Eip2930,
215 cumulative_gas_used: 63000,
216 success: false,
217 logs: vec![],
218 },
219 ];
220
221 let receipts_with_bloom: Vec<_> = receipts.iter().map(|r| r.with_bloom_ref()).collect();
223 let expected_root = calculate_receipt_root(&receipts_with_bloom);
224 let expected_bloom =
225 receipts_with_bloom.iter().fold(Bloom::ZERO, |bloom, r| bloom | r.bloom_ref());
226
227 let (tx, rx) = bounded(4);
229 let (result_tx, result_rx) = oneshot::channel();
230 let receipts_len = receipts.len();
231
232 let handle = ReceiptRootTaskHandle::new(rx, result_tx);
233 let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len));
234
235 for (i, receipt) in receipts.into_iter().enumerate() {
236 tx.send(IndexedReceipt::new(i, receipt)).unwrap();
237 }
238 drop(tx);
239
240 join_handle.await.unwrap();
241 let (task_root, task_bloom) = result_rx.await.unwrap();
242
243 assert_eq!(task_root, expected_root);
244 assert_eq!(task_bloom, expected_bloom);
245 }
246
247 #[tokio::test]
248 async fn test_receipt_root_task_out_of_order() {
249 let receipts: Vec<Receipt> = vec![Receipt::default(); 5];
250
251 let receipts_with_bloom: Vec<_> = receipts.iter().map(|r| r.with_bloom_ref()).collect();
253 let expected_root = calculate_receipt_root(&receipts_with_bloom);
254
255 let (tx, rx) = bounded(4);
256 let (result_tx, result_rx) = oneshot::channel();
257 let receipts_len = receipts.len();
258
259 let handle = ReceiptRootTaskHandle::new(rx, result_tx);
260 let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len));
261
262 for (i, receipt) in receipts.into_iter().enumerate().rev() {
264 tx.send(IndexedReceipt::new(i, receipt)).unwrap();
265 }
266 drop(tx);
267
268 join_handle.await.unwrap();
269 let (root, _bloom) = result_rx.await.unwrap();
270
271 assert_eq!(root, expected_root);
272 }
273}