reth_engine_tree/tree/payload_processor/
receipt_root_task.rs1use alloy_eips::Encodable2718;
10use alloy_primitives::{Bloom, B256};
11use crossbeam_channel::Receiver;
12use reth_primitives_traits::Receipt;
13use reth_trie_common::ordered_root::OrderedTrieRootEncodedBuilder;
14use tokio::sync::oneshot;
15
16#[derive(Debug, Clone)]
18pub struct IndexedReceipt<R> {
19 pub index: usize,
21 pub receipt: R,
23}
24
25impl<R> IndexedReceipt<R> {
26 #[inline]
28 pub const fn new(index: usize, receipt: R) -> Self {
29 Self { index, receipt }
30 }
31}
32
33#[derive(Debug)]
38pub struct ReceiptRootTaskHandle<R> {
39 receipt_rx: Receiver<IndexedReceipt<R>>,
41 result_tx: oneshot::Sender<(B256, Bloom)>,
43}
44
45impl<R: Receipt> ReceiptRootTaskHandle<R> {
46 pub const fn new(
48 receipt_rx: Receiver<IndexedReceipt<R>>,
49 result_tx: oneshot::Sender<(B256, Bloom)>,
50 ) -> Self {
51 Self { receipt_rx, result_tx }
52 }
53
54 pub fn run(self, receipts_len: usize) {
68 let mut builder = OrderedTrieRootEncodedBuilder::new(receipts_len);
69 let mut aggregated_bloom = Bloom::ZERO;
70 let mut encode_buf = Vec::new();
71 let mut received_count = 0usize;
72
73 for indexed_receipt in self.receipt_rx {
74 let receipt_with_bloom = indexed_receipt.receipt.with_bloom_ref();
75
76 encode_buf.clear();
77 receipt_with_bloom.encode_2718(&mut encode_buf);
78
79 aggregated_bloom |= *receipt_with_bloom.bloom_ref();
80 builder.push_unchecked(indexed_receipt.index, &encode_buf);
81 received_count += 1;
82 }
83
84 let Ok(root) = builder.finalize() else {
85 tracing::error!(
89 target: "engine::tree::payload_processor",
90 expected = receipts_len,
91 received = received_count,
92 "Receipt root task received incomplete receipts, execution likely aborted"
93 );
94 return;
95 };
96 let _ = self.result_tx.send((root, aggregated_bloom));
97 }
98}
99
100#[cfg(test)]
101mod tests {
102 use super::*;
103 use alloy_consensus::{proofs::calculate_receipt_root, TxReceipt};
104 use alloy_primitives::{b256, hex, Address, Bytes, Log};
105 use crossbeam_channel::bounded;
106 use reth_ethereum_primitives::{Receipt, TxType};
107
108 #[tokio::test]
109 async fn test_receipt_root_task_empty() {
110 let (_tx, rx) = bounded::<IndexedReceipt<Receipt>>(1);
111 let (result_tx, result_rx) = oneshot::channel();
112 drop(_tx);
113
114 let handle = ReceiptRootTaskHandle::new(rx, result_tx);
115 tokio::task::spawn_blocking(move || handle.run(0)).await.unwrap();
116
117 let (root, bloom) = result_rx.await.unwrap();
118
119 assert_eq!(root, reth_trie_common::EMPTY_ROOT_HASH);
121 assert_eq!(bloom, Bloom::ZERO);
122 }
123
124 #[tokio::test]
125 async fn test_receipt_root_task_single_receipt() {
126 let receipts: Vec<Receipt> = vec![Receipt::default()];
127
128 let (tx, rx) = bounded(1);
129 let (result_tx, result_rx) = oneshot::channel();
130 let receipts_len = receipts.len();
131
132 let handle = ReceiptRootTaskHandle::new(rx, result_tx);
133 let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len));
134
135 for (i, receipt) in receipts.clone().into_iter().enumerate() {
136 tx.send(IndexedReceipt::new(i, receipt)).unwrap();
137 }
138 drop(tx);
139
140 join_handle.await.unwrap();
141 let (root, _bloom) = result_rx.await.unwrap();
142
143 let receipts_with_bloom: Vec<_> = receipts.iter().map(|r| r.with_bloom_ref()).collect();
145 let expected_root = calculate_receipt_root(&receipts_with_bloom);
146
147 assert_eq!(root, expected_root);
148 }
149
150 #[tokio::test]
151 async fn test_receipt_root_task_multiple_receipts() {
152 let receipts: Vec<Receipt> = vec![Receipt::default(); 5];
153
154 let (tx, rx) = bounded(4);
155 let (result_tx, result_rx) = oneshot::channel();
156 let receipts_len = receipts.len();
157
158 let handle = ReceiptRootTaskHandle::new(rx, result_tx);
159 let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len));
160
161 for (i, receipt) in receipts.into_iter().enumerate() {
162 tx.send(IndexedReceipt::new(i, receipt)).unwrap();
163 }
164 drop(tx);
165
166 join_handle.await.unwrap();
167 let (root, bloom) = result_rx.await.unwrap();
168
169 assert_eq!(
171 root,
172 b256!("0x61353b4fb714dc1fccacbf7eafc4273e62f3d1eed716fe41b2a0cd2e12c63ebc")
173 );
174 assert_eq!(
175 bloom,
176 Bloom::from(hex!("00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"))
177 );
178 }
179
180 #[tokio::test]
181 async fn test_receipt_root_matches_standard_calculation() {
182 let receipts = vec![
184 Receipt {
185 tx_type: TxType::Legacy,
186 cumulative_gas_used: 21000,
187 success: true,
188 logs: vec![],
189 },
190 Receipt {
191 tx_type: TxType::Eip1559,
192 cumulative_gas_used: 42000,
193 success: true,
194 logs: vec![Log {
195 address: Address::ZERO,
196 data: alloy_primitives::LogData::new_unchecked(vec![B256::ZERO], Bytes::new()),
197 }],
198 },
199 Receipt {
200 tx_type: TxType::Eip2930,
201 cumulative_gas_used: 63000,
202 success: false,
203 logs: vec![],
204 },
205 ];
206
207 let receipts_with_bloom: Vec<_> = receipts.iter().map(|r| r.with_bloom_ref()).collect();
209 let expected_root = calculate_receipt_root(&receipts_with_bloom);
210 let expected_bloom =
211 receipts_with_bloom.iter().fold(Bloom::ZERO, |bloom, r| bloom | r.bloom_ref());
212
213 let (tx, rx) = bounded(4);
215 let (result_tx, result_rx) = oneshot::channel();
216 let receipts_len = receipts.len();
217
218 let handle = ReceiptRootTaskHandle::new(rx, result_tx);
219 let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len));
220
221 for (i, receipt) in receipts.into_iter().enumerate() {
222 tx.send(IndexedReceipt::new(i, receipt)).unwrap();
223 }
224 drop(tx);
225
226 join_handle.await.unwrap();
227 let (task_root, task_bloom) = result_rx.await.unwrap();
228
229 assert_eq!(task_root, expected_root);
230 assert_eq!(task_bloom, expected_bloom);
231 }
232
233 #[tokio::test]
234 async fn test_receipt_root_task_out_of_order() {
235 let receipts: Vec<Receipt> = vec![Receipt::default(); 5];
236
237 let receipts_with_bloom: Vec<_> = receipts.iter().map(|r| r.with_bloom_ref()).collect();
239 let expected_root = calculate_receipt_root(&receipts_with_bloom);
240
241 let (tx, rx) = bounded(4);
242 let (result_tx, result_rx) = oneshot::channel();
243 let receipts_len = receipts.len();
244
245 let handle = ReceiptRootTaskHandle::new(rx, result_tx);
246 let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len));
247
248 for (i, receipt) in receipts.into_iter().enumerate().rev() {
250 tx.send(IndexedReceipt::new(i, receipt)).unwrap();
251 }
252 drop(tx);
253
254 join_handle.await.unwrap();
255 let (root, _bloom) = result_rx.await.unwrap();
256
257 assert_eq!(root, expected_root);
258 }
259}