reth_engine_tree/tree/payload_processor/
receipt_root_task.rs1use alloy_eips::Encodable2718;
10use alloy_primitives::{map::HashMap, Bloom, B256};
11use crossbeam_channel::Receiver;
12use reth_primitives_traits::Receipt;
13use reth_trie_common::ordered_root::OrderedTrieRootEncodedBuilder;
14use tokio::sync::oneshot;
15use tracing::debug_span;
16
17const RECEIPT_ENCODE_BUF_INITIAL_CAPACITY: usize = 512;
18
19#[derive(Debug, Clone)]
21pub struct IndexedReceipt<R> {
22 pub index: usize,
24 pub receipt: R,
26}
27
28impl<R> IndexedReceipt<R> {
29 #[inline]
31 pub const fn new(index: usize, receipt: R) -> Self {
32 Self { index, receipt }
33 }
34}
35
36#[derive(Debug)]
41pub struct ReceiptRootTaskHandle<R> {
42 receipt_rx: Receiver<IndexedReceipt<R>>,
44 result_tx: oneshot::Sender<(B256, Bloom)>,
46}
47
48impl<R: Receipt> ReceiptRootTaskHandle<R> {
49 pub const fn new(
51 receipt_rx: Receiver<IndexedReceipt<R>>,
52 result_tx: oneshot::Sender<(B256, Bloom)>,
53 ) -> Self {
54 Self { receipt_rx, result_tx }
55 }
56
57 pub fn run(self, receipts_len: impl Into<Option<usize>>) {
71 let receipts_len = receipts_len.into();
72
73 let _span = debug_span!(
74 target: "engine::tree::payload_processor",
75 "receipt_root",
76 receipts_len,
77 )
78 .entered();
79
80 let mut builder = OrderedTrieRootEncodedBuilder::new();
81 let mut aggregated_bloom = Bloom::ZERO;
82 let mut encode_buf = Vec::with_capacity(RECEIPT_ENCODE_BUF_INITIAL_CAPACITY);
83 let mut next = 0usize;
84 let mut pending = HashMap::new();
85
86 let mut push = |receipt: R| {
87 let receipt_with_bloom = receipt.with_bloom_ref();
88
89 encode_buf.clear();
90 receipt_with_bloom.encode_2718(&mut encode_buf);
91
92 aggregated_bloom |= *receipt_with_bloom.bloom_ref();
93 builder.push_next(&encode_buf);
94 };
95
96 for indexed_receipt in self.receipt_rx {
97 if indexed_receipt.index == next {
98 push(indexed_receipt.receipt);
99 next += 1;
100
101 while let Some(receipt) = pending.remove(&next) {
102 push(receipt);
103 next += 1;
104 }
105 } else {
106 pending.insert(indexed_receipt.index, indexed_receipt.receipt);
107 }
108 }
109
110 if receipts_len.is_some_and(|len| len != next) {
111 tracing::error!(
112 target: "engine::tree::payload_processor",
113 expected = receipts_len,
114 received = next,
115 "Receipt root task received incomplete receipts, execution likely aborted"
116 );
117 return;
118 }
119
120 if !pending.is_empty() {
121 tracing::error!(
122 target: "engine::tree::payload_processor",
123 received = next,
124 pending = pending.len(),
125 "Receipt root task received gapped receipts"
126 );
127 return;
128 }
129
130 let _ = self.result_tx.send((builder.finalize(), aggregated_bloom));
131 }
132}
133
134#[cfg(test)]
135mod tests {
136 use super::*;
137 use alloy_consensus::{proofs::calculate_receipt_root, TxReceipt};
138 use alloy_primitives::{b256, hex, Address, Bytes, Log};
139 use crossbeam_channel::bounded;
140 use reth_ethereum_primitives::{Receipt, TxType};
141
142 #[tokio::test]
143 async fn test_receipt_root_task_empty() {
144 let (_tx, rx) = bounded::<IndexedReceipt<Receipt>>(1);
145 let (result_tx, result_rx) = oneshot::channel();
146 drop(_tx);
147
148 let handle = ReceiptRootTaskHandle::new(rx, result_tx);
149 tokio::task::spawn_blocking(move || handle.run(0)).await.unwrap();
150
151 let (root, bloom) = result_rx.await.unwrap();
152
153 assert_eq!(root, reth_trie_common::EMPTY_ROOT_HASH);
155 assert_eq!(bloom, Bloom::ZERO);
156 }
157
158 #[tokio::test]
159 async fn test_receipt_root_task_single_receipt() {
160 let receipts: Vec<Receipt> = vec![Receipt::default()];
161
162 let (tx, rx) = bounded(1);
163 let (result_tx, result_rx) = oneshot::channel();
164 let receipts_len = receipts.len();
165
166 let handle = ReceiptRootTaskHandle::new(rx, result_tx);
167 let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len));
168
169 for (i, receipt) in receipts.clone().into_iter().enumerate() {
170 tx.send(IndexedReceipt::new(i, receipt)).unwrap();
171 }
172 drop(tx);
173
174 join_handle.await.unwrap();
175 let (root, _bloom) = result_rx.await.unwrap();
176
177 let receipts_with_bloom: Vec<_> = receipts.iter().map(|r| r.with_bloom_ref()).collect();
179 let expected_root = calculate_receipt_root(&receipts_with_bloom);
180
181 assert_eq!(root, expected_root);
182 }
183
184 #[tokio::test]
185 async fn test_receipt_root_task_multiple_receipts() {
186 let receipts: Vec<Receipt> = vec![Receipt::default(); 5];
187
188 let (tx, rx) = bounded(4);
189 let (result_tx, result_rx) = oneshot::channel();
190 let receipts_len = receipts.len();
191
192 let handle = ReceiptRootTaskHandle::new(rx, result_tx);
193 let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len));
194
195 for (i, receipt) in receipts.into_iter().enumerate() {
196 tx.send(IndexedReceipt::new(i, receipt)).unwrap();
197 }
198 drop(tx);
199
200 join_handle.await.unwrap();
201 let (root, bloom) = result_rx.await.unwrap();
202
203 assert_eq!(
205 root,
206 b256!("0x61353b4fb714dc1fccacbf7eafc4273e62f3d1eed716fe41b2a0cd2e12c63ebc")
207 );
208 assert_eq!(
209 bloom,
210 Bloom::from(hex!("00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"))
211 );
212 }
213
214 #[tokio::test]
215 async fn test_receipt_root_matches_standard_calculation() {
216 let receipts = vec![
218 Receipt {
219 tx_type: TxType::Legacy,
220 cumulative_gas_used: 21000,
221 success: true,
222 logs: vec![],
223 },
224 Receipt {
225 tx_type: TxType::Eip1559,
226 cumulative_gas_used: 42000,
227 success: true,
228 logs: vec![Log {
229 address: Address::ZERO,
230 data: alloy_primitives::LogData::new_unchecked(vec![B256::ZERO], Bytes::new()),
231 }],
232 },
233 Receipt {
234 tx_type: TxType::Eip2930,
235 cumulative_gas_used: 63000,
236 success: false,
237 logs: vec![],
238 },
239 ];
240
241 let receipts_with_bloom: Vec<_> = receipts.iter().map(|r| r.with_bloom_ref()).collect();
243 let expected_root = calculate_receipt_root(&receipts_with_bloom);
244 let expected_bloom =
245 receipts_with_bloom.iter().fold(Bloom::ZERO, |bloom, r| bloom | r.bloom_ref());
246
247 let (tx, rx) = bounded(4);
249 let (result_tx, result_rx) = oneshot::channel();
250 let receipts_len = receipts.len();
251
252 let handle = ReceiptRootTaskHandle::new(rx, result_tx);
253 let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len));
254
255 for (i, receipt) in receipts.into_iter().enumerate() {
256 tx.send(IndexedReceipt::new(i, receipt)).unwrap();
257 }
258 drop(tx);
259
260 join_handle.await.unwrap();
261 let (task_root, task_bloom) = result_rx.await.unwrap();
262
263 assert_eq!(task_root, expected_root);
264 assert_eq!(task_bloom, expected_bloom);
265 }
266
267 #[tokio::test]
268 async fn test_receipt_root_task_out_of_order() {
269 let receipts: Vec<Receipt> = vec![Receipt::default(); 5];
270
271 let receipts_with_bloom: Vec<_> = receipts.iter().map(|r| r.with_bloom_ref()).collect();
273 let expected_root = calculate_receipt_root(&receipts_with_bloom);
274
275 let (tx, rx) = bounded(4);
276 let (result_tx, result_rx) = oneshot::channel();
277 let receipts_len = receipts.len();
278
279 let handle = ReceiptRootTaskHandle::new(rx, result_tx);
280 let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len));
281
282 for (i, receipt) in receipts.into_iter().enumerate().rev() {
284 tx.send(IndexedReceipt::new(i, receipt)).unwrap();
285 }
286 drop(tx);
287
288 join_handle.await.unwrap();
289 let (root, _bloom) = result_rx.await.unwrap();
290
291 assert_eq!(root, expected_root);
292 }
293}