Skip to main content

reth_engine_tree/tree/payload_processor/
receipt_root_task.rs

1//! Receipt root computation in a background task.
2//!
3//! This module provides a streaming receipt root builder that computes the receipt trie root
4//! in a background thread. Receipts are sent via a channel with their index, and for each
5//! receipt received, the builder incrementally flushes leaves to the underlying
6//! [`OrderedTrieRootEncodedBuilder`] when possible. When the channel closes, the task returns the
7//! computed root.
8
9use alloy_eips::Encodable2718;
10use alloy_primitives::{Bloom, B256};
11use crossbeam_channel::Receiver;
12use reth_primitives_traits::Receipt;
13use reth_trie_common::ordered_root::OrderedTrieRootEncodedBuilder;
14use tokio::sync::oneshot;
15use tracing::debug_span;
16
17/// Receipt with index, ready to be sent to the background task for encoding and trie building.
18#[derive(Debug, Clone)]
19pub struct IndexedReceipt<R> {
20    /// The transaction index within the block.
21    pub index: usize,
22    /// The receipt.
23    pub receipt: R,
24}
25
26impl<R> IndexedReceipt<R> {
27    /// Creates a new indexed receipt.
28    #[inline]
29    pub const fn new(index: usize, receipt: R) -> Self {
30        Self { index, receipt }
31    }
32}
33
34/// Handle for running the receipt root computation in a background task.
35///
36/// This struct holds the channels needed to receive receipts and send the result.
37/// Use [`Self::run`] to execute the computation (typically in a spawned blocking task).
38#[derive(Debug)]
39pub struct ReceiptRootTaskHandle<R> {
40    /// Receiver for indexed receipts.
41    receipt_rx: Receiver<IndexedReceipt<R>>,
42    /// Sender for the computed result.
43    result_tx: oneshot::Sender<(B256, Bloom)>,
44}
45
46impl<R: Receipt> ReceiptRootTaskHandle<R> {
47    /// Creates a new handle from the receipt receiver and result sender channels.
48    pub const fn new(
49        receipt_rx: Receiver<IndexedReceipt<R>>,
50        result_tx: oneshot::Sender<(B256, Bloom)>,
51    ) -> Self {
52        Self { receipt_rx, result_tx }
53    }
54
55    /// Runs the receipt root computation, consuming the handle.
56    ///
57    /// This method receives indexed receipts from the channel, encodes them,
58    /// and builds the trie incrementally. When all receipts have been received
59    /// (channel closed), it sends the result through the oneshot channel.
60    ///
61    /// This is designed to be called inside a blocking task (e.g., via
62    /// `executor.spawn_blocking(move || handle.run(receipts_len))`).
63    ///
64    /// # Arguments
65    ///
66    /// * `receipts_len` - The total number of receipts expected. This is needed to correctly order
67    ///   the trie keys according to RLP encoding rules.
68    pub fn run(self, receipts_len: usize) {
69        let _span = debug_span!(
70            target: "engine::tree::payload_processor",
71            "receipt_root",
72            receipts_len,
73        )
74        .entered();
75
76        let mut builder = OrderedTrieRootEncodedBuilder::new(receipts_len);
77        let mut aggregated_bloom = Bloom::ZERO;
78        let mut encode_buf = Vec::new();
79        let mut received_count = 0usize;
80
81        for indexed_receipt in self.receipt_rx {
82            let receipt_with_bloom = indexed_receipt.receipt.with_bloom_ref();
83
84            encode_buf.clear();
85            receipt_with_bloom.encode_2718(&mut encode_buf);
86
87            aggregated_bloom |= *receipt_with_bloom.bloom_ref();
88            match builder.push(indexed_receipt.index, &encode_buf) {
89                Ok(()) => {
90                    received_count += 1;
91                }
92                Err(err) => {
93                    // If a duplicate or out-of-bounds index is streamed, skip it and
94                    // fall back to computing the receipt root from the full receipts
95                    // vector later.
96                    tracing::error!(
97                        target: "engine::tree::payload_processor",
98                        index = indexed_receipt.index,
99                        ?err,
100                        "Receipt root task received invalid receipt index, skipping"
101                    );
102                }
103            }
104        }
105
106        let Ok(root) = builder.finalize() else {
107            // Finalize fails if we didn't receive exactly `receipts_len` receipts. This can
108            // happen if execution was aborted early (e.g., invalid transaction encountered).
109            // We return without sending a result, allowing the caller to handle the abort.
110            tracing::error!(
111                target: "engine::tree::payload_processor",
112                expected = receipts_len,
113                received = received_count,
114                "Receipt root task received incomplete receipts, execution likely aborted"
115            );
116            return;
117        };
118        let _ = self.result_tx.send((root, aggregated_bloom));
119    }
120}
121
122#[cfg(test)]
123mod tests {
124    use super::*;
125    use alloy_consensus::{proofs::calculate_receipt_root, TxReceipt};
126    use alloy_primitives::{b256, hex, Address, Bytes, Log};
127    use crossbeam_channel::bounded;
128    use reth_ethereum_primitives::{Receipt, TxType};
129
130    #[tokio::test]
131    async fn test_receipt_root_task_empty() {
132        let (_tx, rx) = bounded::<IndexedReceipt<Receipt>>(1);
133        let (result_tx, result_rx) = oneshot::channel();
134        drop(_tx);
135
136        let handle = ReceiptRootTaskHandle::new(rx, result_tx);
137        tokio::task::spawn_blocking(move || handle.run(0)).await.unwrap();
138
139        let (root, bloom) = result_rx.await.unwrap();
140
141        // Empty trie root
142        assert_eq!(root, reth_trie_common::EMPTY_ROOT_HASH);
143        assert_eq!(bloom, Bloom::ZERO);
144    }
145
146    #[tokio::test]
147    async fn test_receipt_root_task_single_receipt() {
148        let receipts: Vec<Receipt> = vec![Receipt::default()];
149
150        let (tx, rx) = bounded(1);
151        let (result_tx, result_rx) = oneshot::channel();
152        let receipts_len = receipts.len();
153
154        let handle = ReceiptRootTaskHandle::new(rx, result_tx);
155        let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len));
156
157        for (i, receipt) in receipts.clone().into_iter().enumerate() {
158            tx.send(IndexedReceipt::new(i, receipt)).unwrap();
159        }
160        drop(tx);
161
162        join_handle.await.unwrap();
163        let (root, _bloom) = result_rx.await.unwrap();
164
165        // Verify against the standard calculation
166        let receipts_with_bloom: Vec<_> = receipts.iter().map(|r| r.with_bloom_ref()).collect();
167        let expected_root = calculate_receipt_root(&receipts_with_bloom);
168
169        assert_eq!(root, expected_root);
170    }
171
172    #[tokio::test]
173    async fn test_receipt_root_task_multiple_receipts() {
174        let receipts: Vec<Receipt> = vec![Receipt::default(); 5];
175
176        let (tx, rx) = bounded(4);
177        let (result_tx, result_rx) = oneshot::channel();
178        let receipts_len = receipts.len();
179
180        let handle = ReceiptRootTaskHandle::new(rx, result_tx);
181        let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len));
182
183        for (i, receipt) in receipts.into_iter().enumerate() {
184            tx.send(IndexedReceipt::new(i, receipt)).unwrap();
185        }
186        drop(tx);
187
188        join_handle.await.unwrap();
189        let (root, bloom) = result_rx.await.unwrap();
190
191        // Verify against expected values from existing test
192        assert_eq!(
193            root,
194            b256!("0x61353b4fb714dc1fccacbf7eafc4273e62f3d1eed716fe41b2a0cd2e12c63ebc")
195        );
196        assert_eq!(
197            bloom,
198            Bloom::from(hex!("00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"))
199        );
200    }
201
202    #[tokio::test]
203    async fn test_receipt_root_matches_standard_calculation() {
204        // Create some receipts with actual data
205        let receipts = vec![
206            Receipt {
207                tx_type: TxType::Legacy,
208                cumulative_gas_used: 21000,
209                success: true,
210                logs: vec![],
211            },
212            Receipt {
213                tx_type: TxType::Eip1559,
214                cumulative_gas_used: 42000,
215                success: true,
216                logs: vec![Log {
217                    address: Address::ZERO,
218                    data: alloy_primitives::LogData::new_unchecked(vec![B256::ZERO], Bytes::new()),
219                }],
220            },
221            Receipt {
222                tx_type: TxType::Eip2930,
223                cumulative_gas_used: 63000,
224                success: false,
225                logs: vec![],
226            },
227        ];
228
229        // Calculate expected values first (before we move receipts)
230        let receipts_with_bloom: Vec<_> = receipts.iter().map(|r| r.with_bloom_ref()).collect();
231        let expected_root = calculate_receipt_root(&receipts_with_bloom);
232        let expected_bloom =
233            receipts_with_bloom.iter().fold(Bloom::ZERO, |bloom, r| bloom | r.bloom_ref());
234
235        // Calculate using the task
236        let (tx, rx) = bounded(4);
237        let (result_tx, result_rx) = oneshot::channel();
238        let receipts_len = receipts.len();
239
240        let handle = ReceiptRootTaskHandle::new(rx, result_tx);
241        let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len));
242
243        for (i, receipt) in receipts.into_iter().enumerate() {
244            tx.send(IndexedReceipt::new(i, receipt)).unwrap();
245        }
246        drop(tx);
247
248        join_handle.await.unwrap();
249        let (task_root, task_bloom) = result_rx.await.unwrap();
250
251        assert_eq!(task_root, expected_root);
252        assert_eq!(task_bloom, expected_bloom);
253    }
254
255    #[tokio::test]
256    async fn test_receipt_root_task_out_of_order() {
257        let receipts: Vec<Receipt> = vec![Receipt::default(); 5];
258
259        // Calculate expected values first (before we move receipts)
260        let receipts_with_bloom: Vec<_> = receipts.iter().map(|r| r.with_bloom_ref()).collect();
261        let expected_root = calculate_receipt_root(&receipts_with_bloom);
262
263        let (tx, rx) = bounded(4);
264        let (result_tx, result_rx) = oneshot::channel();
265        let receipts_len = receipts.len();
266
267        let handle = ReceiptRootTaskHandle::new(rx, result_tx);
268        let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len));
269
270        // Send in reverse order to test out-of-order handling
271        for (i, receipt) in receipts.into_iter().enumerate().rev() {
272            tx.send(IndexedReceipt::new(i, receipt)).unwrap();
273        }
274        drop(tx);
275
276        join_handle.await.unwrap();
277        let (root, _bloom) = result_rx.await.unwrap();
278
279        assert_eq!(root, expected_root);
280    }
281}