reth_engine_tree/tree/payload_processor/
receipt_root_task.rs

1//! Receipt root computation in a background task.
2//!
3//! This module provides a streaming receipt root builder that computes the receipt trie root
4//! in a background thread. Receipts are sent via a channel with their index, and for each
5//! receipt received, the builder incrementally flushes leaves to the underlying
6//! [`OrderedTrieRootEncodedBuilder`] when possible. When the channel closes, the task returns the
7//! computed root.
8
9use alloy_eips::Encodable2718;
10use alloy_primitives::{Bloom, B256};
11use crossbeam_channel::Receiver;
12use reth_primitives_traits::Receipt;
13use reth_trie_common::ordered_root::OrderedTrieRootEncodedBuilder;
14use tokio::sync::oneshot;
15
16/// Receipt with index, ready to be sent to the background task for encoding and trie building.
17#[derive(Debug, Clone)]
18pub struct IndexedReceipt<R> {
19    /// The transaction index within the block.
20    pub index: usize,
21    /// The receipt.
22    pub receipt: R,
23}
24
25impl<R> IndexedReceipt<R> {
26    /// Creates a new indexed receipt.
27    #[inline]
28    pub const fn new(index: usize, receipt: R) -> Self {
29        Self { index, receipt }
30    }
31}
32
33/// Handle for running the receipt root computation in a background task.
34///
35/// This struct holds the channels needed to receive receipts and send the result.
36/// Use [`Self::run`] to execute the computation (typically in a spawned blocking task).
37#[derive(Debug)]
38pub struct ReceiptRootTaskHandle<R> {
39    /// Receiver for indexed receipts.
40    receipt_rx: Receiver<IndexedReceipt<R>>,
41    /// Sender for the computed result.
42    result_tx: oneshot::Sender<(B256, Bloom)>,
43}
44
45impl<R: Receipt> ReceiptRootTaskHandle<R> {
46    /// Creates a new handle from the receipt receiver and result sender channels.
47    pub const fn new(
48        receipt_rx: Receiver<IndexedReceipt<R>>,
49        result_tx: oneshot::Sender<(B256, Bloom)>,
50    ) -> Self {
51        Self { receipt_rx, result_tx }
52    }
53
54    /// Runs the receipt root computation, consuming the handle.
55    ///
56    /// This method receives indexed receipts from the channel, encodes them,
57    /// and builds the trie incrementally. When all receipts have been received
58    /// (channel closed), it sends the result through the oneshot channel.
59    ///
60    /// This is designed to be called inside a blocking task (e.g., via
61    /// `executor.spawn_blocking(move || handle.run(receipts_len))`).
62    ///
63    /// # Arguments
64    ///
65    /// * `receipts_len` - The total number of receipts expected. This is needed to correctly order
66    ///   the trie keys according to RLP encoding rules.
67    pub fn run(self, receipts_len: usize) {
68        let mut builder = OrderedTrieRootEncodedBuilder::new(receipts_len);
69        let mut aggregated_bloom = Bloom::ZERO;
70        let mut encode_buf = Vec::new();
71        let mut received_count = 0usize;
72
73        for indexed_receipt in self.receipt_rx {
74            let receipt_with_bloom = indexed_receipt.receipt.with_bloom_ref();
75
76            encode_buf.clear();
77            receipt_with_bloom.encode_2718(&mut encode_buf);
78
79            aggregated_bloom |= *receipt_with_bloom.bloom_ref();
80            builder.push_unchecked(indexed_receipt.index, &encode_buf);
81            received_count += 1;
82        }
83
84        let Ok(root) = builder.finalize() else {
85            // Finalize fails if we didn't receive exactly `receipts_len` receipts. This can
86            // happen if execution was aborted early (e.g., invalid transaction encountered).
87            // We return without sending a result, allowing the caller to handle the abort.
88            tracing::error!(
89                target: "engine::tree::payload_processor",
90                expected = receipts_len,
91                received = received_count,
92                "Receipt root task received incomplete receipts, execution likely aborted"
93            );
94            return;
95        };
96        let _ = self.result_tx.send((root, aggregated_bloom));
97    }
98}
99
100#[cfg(test)]
101mod tests {
102    use super::*;
103    use alloy_consensus::{proofs::calculate_receipt_root, TxReceipt};
104    use alloy_primitives::{b256, hex, Address, Bytes, Log};
105    use crossbeam_channel::bounded;
106    use reth_ethereum_primitives::{Receipt, TxType};
107
108    #[tokio::test]
109    async fn test_receipt_root_task_empty() {
110        let (_tx, rx) = bounded::<IndexedReceipt<Receipt>>(1);
111        let (result_tx, result_rx) = oneshot::channel();
112        drop(_tx);
113
114        let handle = ReceiptRootTaskHandle::new(rx, result_tx);
115        tokio::task::spawn_blocking(move || handle.run(0)).await.unwrap();
116
117        let (root, bloom) = result_rx.await.unwrap();
118
119        // Empty trie root
120        assert_eq!(root, reth_trie_common::EMPTY_ROOT_HASH);
121        assert_eq!(bloom, Bloom::ZERO);
122    }
123
124    #[tokio::test]
125    async fn test_receipt_root_task_single_receipt() {
126        let receipts: Vec<Receipt> = vec![Receipt::default()];
127
128        let (tx, rx) = bounded(1);
129        let (result_tx, result_rx) = oneshot::channel();
130        let receipts_len = receipts.len();
131
132        let handle = ReceiptRootTaskHandle::new(rx, result_tx);
133        let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len));
134
135        for (i, receipt) in receipts.clone().into_iter().enumerate() {
136            tx.send(IndexedReceipt::new(i, receipt)).unwrap();
137        }
138        drop(tx);
139
140        join_handle.await.unwrap();
141        let (root, _bloom) = result_rx.await.unwrap();
142
143        // Verify against the standard calculation
144        let receipts_with_bloom: Vec<_> = receipts.iter().map(|r| r.with_bloom_ref()).collect();
145        let expected_root = calculate_receipt_root(&receipts_with_bloom);
146
147        assert_eq!(root, expected_root);
148    }
149
150    #[tokio::test]
151    async fn test_receipt_root_task_multiple_receipts() {
152        let receipts: Vec<Receipt> = vec![Receipt::default(); 5];
153
154        let (tx, rx) = bounded(4);
155        let (result_tx, result_rx) = oneshot::channel();
156        let receipts_len = receipts.len();
157
158        let handle = ReceiptRootTaskHandle::new(rx, result_tx);
159        let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len));
160
161        for (i, receipt) in receipts.into_iter().enumerate() {
162            tx.send(IndexedReceipt::new(i, receipt)).unwrap();
163        }
164        drop(tx);
165
166        join_handle.await.unwrap();
167        let (root, bloom) = result_rx.await.unwrap();
168
169        // Verify against expected values from existing test
170        assert_eq!(
171            root,
172            b256!("0x61353b4fb714dc1fccacbf7eafc4273e62f3d1eed716fe41b2a0cd2e12c63ebc")
173        );
174        assert_eq!(
175            bloom,
176            Bloom::from(hex!("00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"))
177        );
178    }
179
180    #[tokio::test]
181    async fn test_receipt_root_matches_standard_calculation() {
182        // Create some receipts with actual data
183        let receipts = vec![
184            Receipt {
185                tx_type: TxType::Legacy,
186                cumulative_gas_used: 21000,
187                success: true,
188                logs: vec![],
189            },
190            Receipt {
191                tx_type: TxType::Eip1559,
192                cumulative_gas_used: 42000,
193                success: true,
194                logs: vec![Log {
195                    address: Address::ZERO,
196                    data: alloy_primitives::LogData::new_unchecked(vec![B256::ZERO], Bytes::new()),
197                }],
198            },
199            Receipt {
200                tx_type: TxType::Eip2930,
201                cumulative_gas_used: 63000,
202                success: false,
203                logs: vec![],
204            },
205        ];
206
207        // Calculate expected values first (before we move receipts)
208        let receipts_with_bloom: Vec<_> = receipts.iter().map(|r| r.with_bloom_ref()).collect();
209        let expected_root = calculate_receipt_root(&receipts_with_bloom);
210        let expected_bloom =
211            receipts_with_bloom.iter().fold(Bloom::ZERO, |bloom, r| bloom | r.bloom_ref());
212
213        // Calculate using the task
214        let (tx, rx) = bounded(4);
215        let (result_tx, result_rx) = oneshot::channel();
216        let receipts_len = receipts.len();
217
218        let handle = ReceiptRootTaskHandle::new(rx, result_tx);
219        let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len));
220
221        for (i, receipt) in receipts.into_iter().enumerate() {
222            tx.send(IndexedReceipt::new(i, receipt)).unwrap();
223        }
224        drop(tx);
225
226        join_handle.await.unwrap();
227        let (task_root, task_bloom) = result_rx.await.unwrap();
228
229        assert_eq!(task_root, expected_root);
230        assert_eq!(task_bloom, expected_bloom);
231    }
232
233    #[tokio::test]
234    async fn test_receipt_root_task_out_of_order() {
235        let receipts: Vec<Receipt> = vec![Receipt::default(); 5];
236
237        // Calculate expected values first (before we move receipts)
238        let receipts_with_bloom: Vec<_> = receipts.iter().map(|r| r.with_bloom_ref()).collect();
239        let expected_root = calculate_receipt_root(&receipts_with_bloom);
240
241        let (tx, rx) = bounded(4);
242        let (result_tx, result_rx) = oneshot::channel();
243        let receipts_len = receipts.len();
244
245        let handle = ReceiptRootTaskHandle::new(rx, result_tx);
246        let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len));
247
248        // Send in reverse order to test out-of-order handling
249        for (i, receipt) in receipts.into_iter().enumerate().rev() {
250            tx.send(IndexedReceipt::new(i, receipt)).unwrap();
251        }
252        drop(tx);
253
254        join_handle.await.unwrap();
255        let (root, _bloom) = result_rx.await.unwrap();
256
257        assert_eq!(root, expected_root);
258    }
259}