Skip to main content

reth_engine_tree/tree/payload_processor/
receipt_root_task.rs

1//! Receipt root computation in a background task.
2//!
3//! This module provides a streaming receipt root builder that computes the receipt trie root
4//! in a background thread. Receipts are sent via a channel with their index, and for each
5//! receipt received, the builder incrementally flushes leaves to the underlying
6//! [`OrderedTrieRootEncodedBuilder`] when possible. When the channel closes, the task returns the
7//! computed root.
8
9use alloy_eips::Encodable2718;
10use alloy_primitives::{Bloom, B256};
11use crossbeam_channel::Receiver;
12use reth_primitives_traits::Receipt;
13use reth_trie_common::ordered_root::OrderedTrieRootEncodedBuilder;
14use tokio::sync::oneshot;
15
16/// Receipt with index, ready to be sent to the background task for encoding and trie building.
17#[derive(Debug, Clone)]
18pub struct IndexedReceipt<R> {
19    /// The transaction index within the block.
20    pub index: usize,
21    /// The receipt.
22    pub receipt: R,
23}
24
25impl<R> IndexedReceipt<R> {
26    /// Creates a new indexed receipt.
27    #[inline]
28    pub const fn new(index: usize, receipt: R) -> Self {
29        Self { index, receipt }
30    }
31}
32
33/// Handle for running the receipt root computation in a background task.
34///
35/// This struct holds the channels needed to receive receipts and send the result.
36/// Use [`Self::run`] to execute the computation (typically in a spawned blocking task).
37#[derive(Debug)]
38pub struct ReceiptRootTaskHandle<R> {
39    /// Receiver for indexed receipts.
40    receipt_rx: Receiver<IndexedReceipt<R>>,
41    /// Sender for the computed result.
42    result_tx: oneshot::Sender<(B256, Bloom)>,
43}
44
45impl<R: Receipt> ReceiptRootTaskHandle<R> {
46    /// Creates a new handle from the receipt receiver and result sender channels.
47    pub const fn new(
48        receipt_rx: Receiver<IndexedReceipt<R>>,
49        result_tx: oneshot::Sender<(B256, Bloom)>,
50    ) -> Self {
51        Self { receipt_rx, result_tx }
52    }
53
54    /// Runs the receipt root computation, consuming the handle.
55    ///
56    /// This method receives indexed receipts from the channel, encodes them,
57    /// and builds the trie incrementally. When all receipts have been received
58    /// (channel closed), it sends the result through the oneshot channel.
59    ///
60    /// This is designed to be called inside a blocking task (e.g., via
61    /// `executor.spawn_blocking(move || handle.run(receipts_len))`).
62    ///
63    /// # Arguments
64    ///
65    /// * `receipts_len` - The total number of receipts expected. This is needed to correctly order
66    ///   the trie keys according to RLP encoding rules.
67    pub fn run(self, receipts_len: usize) {
68        let mut builder = OrderedTrieRootEncodedBuilder::new(receipts_len);
69        let mut aggregated_bloom = Bloom::ZERO;
70        let mut encode_buf = Vec::new();
71        let mut received_count = 0usize;
72
73        for indexed_receipt in self.receipt_rx {
74            let receipt_with_bloom = indexed_receipt.receipt.with_bloom_ref();
75
76            encode_buf.clear();
77            receipt_with_bloom.encode_2718(&mut encode_buf);
78
79            aggregated_bloom |= *receipt_with_bloom.bloom_ref();
80            match builder.push(indexed_receipt.index, &encode_buf) {
81                Ok(()) => {
82                    received_count += 1;
83                }
84                Err(err) => {
85                    // If a duplicate or out-of-bounds index is streamed, skip it and
86                    // fall back to computing the receipt root from the full receipts
87                    // vector later.
88                    tracing::error!(
89                        target: "engine::tree::payload_processor",
90                        index = indexed_receipt.index,
91                        ?err,
92                        "Receipt root task received invalid receipt index, skipping"
93                    );
94                }
95            }
96        }
97
98        let Ok(root) = builder.finalize() else {
99            // Finalize fails if we didn't receive exactly `receipts_len` receipts. This can
100            // happen if execution was aborted early (e.g., invalid transaction encountered).
101            // We return without sending a result, allowing the caller to handle the abort.
102            tracing::error!(
103                target: "engine::tree::payload_processor",
104                expected = receipts_len,
105                received = received_count,
106                "Receipt root task received incomplete receipts, execution likely aborted"
107            );
108            return;
109        };
110        let _ = self.result_tx.send((root, aggregated_bloom));
111    }
112}
113
114#[cfg(test)]
115mod tests {
116    use super::*;
117    use alloy_consensus::{proofs::calculate_receipt_root, TxReceipt};
118    use alloy_primitives::{b256, hex, Address, Bytes, Log};
119    use crossbeam_channel::bounded;
120    use reth_ethereum_primitives::{Receipt, TxType};
121
122    #[tokio::test]
123    async fn test_receipt_root_task_empty() {
124        let (_tx, rx) = bounded::<IndexedReceipt<Receipt>>(1);
125        let (result_tx, result_rx) = oneshot::channel();
126        drop(_tx);
127
128        let handle = ReceiptRootTaskHandle::new(rx, result_tx);
129        tokio::task::spawn_blocking(move || handle.run(0)).await.unwrap();
130
131        let (root, bloom) = result_rx.await.unwrap();
132
133        // Empty trie root
134        assert_eq!(root, reth_trie_common::EMPTY_ROOT_HASH);
135        assert_eq!(bloom, Bloom::ZERO);
136    }
137
138    #[tokio::test]
139    async fn test_receipt_root_task_single_receipt() {
140        let receipts: Vec<Receipt> = vec![Receipt::default()];
141
142        let (tx, rx) = bounded(1);
143        let (result_tx, result_rx) = oneshot::channel();
144        let receipts_len = receipts.len();
145
146        let handle = ReceiptRootTaskHandle::new(rx, result_tx);
147        let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len));
148
149        for (i, receipt) in receipts.clone().into_iter().enumerate() {
150            tx.send(IndexedReceipt::new(i, receipt)).unwrap();
151        }
152        drop(tx);
153
154        join_handle.await.unwrap();
155        let (root, _bloom) = result_rx.await.unwrap();
156
157        // Verify against the standard calculation
158        let receipts_with_bloom: Vec<_> = receipts.iter().map(|r| r.with_bloom_ref()).collect();
159        let expected_root = calculate_receipt_root(&receipts_with_bloom);
160
161        assert_eq!(root, expected_root);
162    }
163
164    #[tokio::test]
165    async fn test_receipt_root_task_multiple_receipts() {
166        let receipts: Vec<Receipt> = vec![Receipt::default(); 5];
167
168        let (tx, rx) = bounded(4);
169        let (result_tx, result_rx) = oneshot::channel();
170        let receipts_len = receipts.len();
171
172        let handle = ReceiptRootTaskHandle::new(rx, result_tx);
173        let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len));
174
175        for (i, receipt) in receipts.into_iter().enumerate() {
176            tx.send(IndexedReceipt::new(i, receipt)).unwrap();
177        }
178        drop(tx);
179
180        join_handle.await.unwrap();
181        let (root, bloom) = result_rx.await.unwrap();
182
183        // Verify against expected values from existing test
184        assert_eq!(
185            root,
186            b256!("0x61353b4fb714dc1fccacbf7eafc4273e62f3d1eed716fe41b2a0cd2e12c63ebc")
187        );
188        assert_eq!(
189            bloom,
190            Bloom::from(hex!("00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"))
191        );
192    }
193
194    #[tokio::test]
195    async fn test_receipt_root_matches_standard_calculation() {
196        // Create some receipts with actual data
197        let receipts = vec![
198            Receipt {
199                tx_type: TxType::Legacy,
200                cumulative_gas_used: 21000,
201                success: true,
202                logs: vec![],
203            },
204            Receipt {
205                tx_type: TxType::Eip1559,
206                cumulative_gas_used: 42000,
207                success: true,
208                logs: vec![Log {
209                    address: Address::ZERO,
210                    data: alloy_primitives::LogData::new_unchecked(vec![B256::ZERO], Bytes::new()),
211                }],
212            },
213            Receipt {
214                tx_type: TxType::Eip2930,
215                cumulative_gas_used: 63000,
216                success: false,
217                logs: vec![],
218            },
219        ];
220
221        // Calculate expected values first (before we move receipts)
222        let receipts_with_bloom: Vec<_> = receipts.iter().map(|r| r.with_bloom_ref()).collect();
223        let expected_root = calculate_receipt_root(&receipts_with_bloom);
224        let expected_bloom =
225            receipts_with_bloom.iter().fold(Bloom::ZERO, |bloom, r| bloom | r.bloom_ref());
226
227        // Calculate using the task
228        let (tx, rx) = bounded(4);
229        let (result_tx, result_rx) = oneshot::channel();
230        let receipts_len = receipts.len();
231
232        let handle = ReceiptRootTaskHandle::new(rx, result_tx);
233        let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len));
234
235        for (i, receipt) in receipts.into_iter().enumerate() {
236            tx.send(IndexedReceipt::new(i, receipt)).unwrap();
237        }
238        drop(tx);
239
240        join_handle.await.unwrap();
241        let (task_root, task_bloom) = result_rx.await.unwrap();
242
243        assert_eq!(task_root, expected_root);
244        assert_eq!(task_bloom, expected_bloom);
245    }
246
247    #[tokio::test]
248    async fn test_receipt_root_task_out_of_order() {
249        let receipts: Vec<Receipt> = vec![Receipt::default(); 5];
250
251        // Calculate expected values first (before we move receipts)
252        let receipts_with_bloom: Vec<_> = receipts.iter().map(|r| r.with_bloom_ref()).collect();
253        let expected_root = calculate_receipt_root(&receipts_with_bloom);
254
255        let (tx, rx) = bounded(4);
256        let (result_tx, result_rx) = oneshot::channel();
257        let receipts_len = receipts.len();
258
259        let handle = ReceiptRootTaskHandle::new(rx, result_tx);
260        let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len));
261
262        // Send in reverse order to test out-of-order handling
263        for (i, receipt) in receipts.into_iter().enumerate().rev() {
264            tx.send(IndexedReceipt::new(i, receipt)).unwrap();
265        }
266        drop(tx);
267
268        join_handle.await.unwrap();
269        let (root, _bloom) = result_rx.await.unwrap();
270
271        assert_eq!(root, expected_root);
272    }
273}