reth_engine_tree/tree/payload_processor/
receipt_root_task.rs1use alloy_eips::Encodable2718;
10use alloy_primitives::{Bloom, B256};
11use crossbeam_channel::Receiver;
12use reth_primitives_traits::Receipt;
13use reth_trie_common::ordered_root::OrderedTrieRootEncodedBuilder;
14use tokio::sync::oneshot;
15use tracing::debug_span;
16
17#[derive(Debug, Clone)]
19pub struct IndexedReceipt<R> {
20 pub index: usize,
22 pub receipt: R,
24}
25
26impl<R> IndexedReceipt<R> {
27 #[inline]
29 pub const fn new(index: usize, receipt: R) -> Self {
30 Self { index, receipt }
31 }
32}
33
34#[derive(Debug)]
39pub struct ReceiptRootTaskHandle<R> {
40 receipt_rx: Receiver<IndexedReceipt<R>>,
42 result_tx: oneshot::Sender<(B256, Bloom)>,
44}
45
46impl<R: Receipt> ReceiptRootTaskHandle<R> {
47 pub const fn new(
49 receipt_rx: Receiver<IndexedReceipt<R>>,
50 result_tx: oneshot::Sender<(B256, Bloom)>,
51 ) -> Self {
52 Self { receipt_rx, result_tx }
53 }
54
55 pub fn run(self, receipts_len: usize) {
69 let _span = debug_span!(
70 target: "engine::tree::payload_processor",
71 "receipt_root",
72 receipts_len,
73 )
74 .entered();
75
76 let mut builder = OrderedTrieRootEncodedBuilder::new(receipts_len);
77 let mut aggregated_bloom = Bloom::ZERO;
78 let mut encode_buf = Vec::new();
79 let mut received_count = 0usize;
80
81 for indexed_receipt in self.receipt_rx {
82 let receipt_with_bloom = indexed_receipt.receipt.with_bloom_ref();
83
84 encode_buf.clear();
85 receipt_with_bloom.encode_2718(&mut encode_buf);
86
87 aggregated_bloom |= *receipt_with_bloom.bloom_ref();
88 match builder.push(indexed_receipt.index, &encode_buf) {
89 Ok(()) => {
90 received_count += 1;
91 }
92 Err(err) => {
93 tracing::error!(
97 target: "engine::tree::payload_processor",
98 index = indexed_receipt.index,
99 ?err,
100 "Receipt root task received invalid receipt index, skipping"
101 );
102 }
103 }
104 }
105
106 let Ok(root) = builder.finalize() else {
107 tracing::error!(
111 target: "engine::tree::payload_processor",
112 expected = receipts_len,
113 received = received_count,
114 "Receipt root task received incomplete receipts, execution likely aborted"
115 );
116 return;
117 };
118 let _ = self.result_tx.send((root, aggregated_bloom));
119 }
120}
121
122#[cfg(test)]
123mod tests {
124 use super::*;
125 use alloy_consensus::{proofs::calculate_receipt_root, TxReceipt};
126 use alloy_primitives::{b256, hex, Address, Bytes, Log};
127 use crossbeam_channel::bounded;
128 use reth_ethereum_primitives::{Receipt, TxType};
129
130 #[tokio::test]
131 async fn test_receipt_root_task_empty() {
132 let (_tx, rx) = bounded::<IndexedReceipt<Receipt>>(1);
133 let (result_tx, result_rx) = oneshot::channel();
134 drop(_tx);
135
136 let handle = ReceiptRootTaskHandle::new(rx, result_tx);
137 tokio::task::spawn_blocking(move || handle.run(0)).await.unwrap();
138
139 let (root, bloom) = result_rx.await.unwrap();
140
141 assert_eq!(root, reth_trie_common::EMPTY_ROOT_HASH);
143 assert_eq!(bloom, Bloom::ZERO);
144 }
145
146 #[tokio::test]
147 async fn test_receipt_root_task_single_receipt() {
148 let receipts: Vec<Receipt> = vec![Receipt::default()];
149
150 let (tx, rx) = bounded(1);
151 let (result_tx, result_rx) = oneshot::channel();
152 let receipts_len = receipts.len();
153
154 let handle = ReceiptRootTaskHandle::new(rx, result_tx);
155 let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len));
156
157 for (i, receipt) in receipts.clone().into_iter().enumerate() {
158 tx.send(IndexedReceipt::new(i, receipt)).unwrap();
159 }
160 drop(tx);
161
162 join_handle.await.unwrap();
163 let (root, _bloom) = result_rx.await.unwrap();
164
165 let receipts_with_bloom: Vec<_> = receipts.iter().map(|r| r.with_bloom_ref()).collect();
167 let expected_root = calculate_receipt_root(&receipts_with_bloom);
168
169 assert_eq!(root, expected_root);
170 }
171
172 #[tokio::test]
173 async fn test_receipt_root_task_multiple_receipts() {
174 let receipts: Vec<Receipt> = vec![Receipt::default(); 5];
175
176 let (tx, rx) = bounded(4);
177 let (result_tx, result_rx) = oneshot::channel();
178 let receipts_len = receipts.len();
179
180 let handle = ReceiptRootTaskHandle::new(rx, result_tx);
181 let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len));
182
183 for (i, receipt) in receipts.into_iter().enumerate() {
184 tx.send(IndexedReceipt::new(i, receipt)).unwrap();
185 }
186 drop(tx);
187
188 join_handle.await.unwrap();
189 let (root, bloom) = result_rx.await.unwrap();
190
191 assert_eq!(
193 root,
194 b256!("0x61353b4fb714dc1fccacbf7eafc4273e62f3d1eed716fe41b2a0cd2e12c63ebc")
195 );
196 assert_eq!(
197 bloom,
198 Bloom::from(hex!("00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"))
199 );
200 }
201
202 #[tokio::test]
203 async fn test_receipt_root_matches_standard_calculation() {
204 let receipts = vec![
206 Receipt {
207 tx_type: TxType::Legacy,
208 cumulative_gas_used: 21000,
209 success: true,
210 logs: vec![],
211 },
212 Receipt {
213 tx_type: TxType::Eip1559,
214 cumulative_gas_used: 42000,
215 success: true,
216 logs: vec![Log {
217 address: Address::ZERO,
218 data: alloy_primitives::LogData::new_unchecked(vec![B256::ZERO], Bytes::new()),
219 }],
220 },
221 Receipt {
222 tx_type: TxType::Eip2930,
223 cumulative_gas_used: 63000,
224 success: false,
225 logs: vec![],
226 },
227 ];
228
229 let receipts_with_bloom: Vec<_> = receipts.iter().map(|r| r.with_bloom_ref()).collect();
231 let expected_root = calculate_receipt_root(&receipts_with_bloom);
232 let expected_bloom =
233 receipts_with_bloom.iter().fold(Bloom::ZERO, |bloom, r| bloom | r.bloom_ref());
234
235 let (tx, rx) = bounded(4);
237 let (result_tx, result_rx) = oneshot::channel();
238 let receipts_len = receipts.len();
239
240 let handle = ReceiptRootTaskHandle::new(rx, result_tx);
241 let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len));
242
243 for (i, receipt) in receipts.into_iter().enumerate() {
244 tx.send(IndexedReceipt::new(i, receipt)).unwrap();
245 }
246 drop(tx);
247
248 join_handle.await.unwrap();
249 let (task_root, task_bloom) = result_rx.await.unwrap();
250
251 assert_eq!(task_root, expected_root);
252 assert_eq!(task_bloom, expected_bloom);
253 }
254
255 #[tokio::test]
256 async fn test_receipt_root_task_out_of_order() {
257 let receipts: Vec<Receipt> = vec![Receipt::default(); 5];
258
259 let receipts_with_bloom: Vec<_> = receipts.iter().map(|r| r.with_bloom_ref()).collect();
261 let expected_root = calculate_receipt_root(&receipts_with_bloom);
262
263 let (tx, rx) = bounded(4);
264 let (result_tx, result_rx) = oneshot::channel();
265 let receipts_len = receipts.len();
266
267 let handle = ReceiptRootTaskHandle::new(rx, result_tx);
268 let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len));
269
270 for (i, receipt) in receipts.into_iter().enumerate().rev() {
272 tx.send(IndexedReceipt::new(i, receipt)).unwrap();
273 }
274 drop(tx);
275
276 join_handle.await.unwrap();
277 let (root, _bloom) = result_rx.await.unwrap();
278
279 assert_eq!(root, expected_root);
280 }
281}