Skip to main content

reth_engine_tree/tree/payload_processor/
receipt_root_task.rs

1//! Receipt root computation in a background task.
2//!
3//! This module provides a streaming receipt root builder that computes the receipt trie root
4//! in a background thread. Receipts are sent via a channel with their index, and for each
5//! receipt received, the builder incrementally flushes leaves to the underlying
6//! [`OrderedTrieRootEncodedBuilder`] when possible. When the channel closes, the task returns the
7//! computed root.
8
9use alloy_eips::Encodable2718;
10use alloy_primitives::{map::HashMap, Bloom, B256};
11use crossbeam_channel::Receiver;
12use reth_primitives_traits::Receipt;
13use reth_trie_common::ordered_root::OrderedTrieRootEncodedBuilder;
14use tokio::sync::oneshot;
15use tracing::debug_span;
16
17const RECEIPT_ENCODE_BUF_INITIAL_CAPACITY: usize = 512;
18
19/// Receipt with index, ready to be sent to the background task for encoding and trie building.
20#[derive(Debug, Clone)]
21pub struct IndexedReceipt<R> {
22    /// The transaction index within the block.
23    pub index: usize,
24    /// The receipt.
25    pub receipt: R,
26}
27
28impl<R> IndexedReceipt<R> {
29    /// Creates a new indexed receipt.
30    #[inline]
31    pub const fn new(index: usize, receipt: R) -> Self {
32        Self { index, receipt }
33    }
34}
35
36/// Handle for running the receipt root computation in a background task.
37///
38/// This struct holds the channels needed to receive receipts and send the result.
39/// Use [`Self::run`] to execute the computation (typically in a spawned blocking task).
40#[derive(Debug)]
41pub struct ReceiptRootTaskHandle<R> {
42    /// Receiver for indexed receipts.
43    receipt_rx: Receiver<IndexedReceipt<R>>,
44    /// Sender for the computed result.
45    result_tx: oneshot::Sender<(B256, Bloom)>,
46}
47
48impl<R: Receipt> ReceiptRootTaskHandle<R> {
49    /// Creates a new handle from the receipt receiver and result sender channels.
50    pub const fn new(
51        receipt_rx: Receiver<IndexedReceipt<R>>,
52        result_tx: oneshot::Sender<(B256, Bloom)>,
53    ) -> Self {
54        Self { receipt_rx, result_tx }
55    }
56
57    /// Runs the receipt root computation, consuming the handle.
58    ///
59    /// This method receives indexed receipts from the channel, encodes them,
60    /// and builds the trie incrementally. When all receipts have been received
61    /// (channel closed), it sends the result through the oneshot channel.
62    ///
63    /// This is designed to be called inside a blocking task (e.g., via
64    /// `executor.spawn_blocking(move || handle.run(receipts_len))`).
65    ///
66    /// # Arguments
67    ///
68    /// * `receipts_len` - The total number of receipts expected. This is needed to correctly order
69    ///   the trie keys according to RLP encoding rules.
70    pub fn run(self, receipts_len: impl Into<Option<usize>>) {
71        let receipts_len = receipts_len.into();
72
73        let _span = debug_span!(
74            target: "engine::tree::payload_processor",
75            "receipt_root",
76            receipts_len,
77        )
78        .entered();
79
80        let mut builder = OrderedTrieRootEncodedBuilder::new();
81        let mut aggregated_bloom = Bloom::ZERO;
82        let mut encode_buf = Vec::with_capacity(RECEIPT_ENCODE_BUF_INITIAL_CAPACITY);
83        let mut next = 0usize;
84        let mut pending = HashMap::new();
85
86        let mut push = |receipt: R| {
87            let receipt_with_bloom = receipt.with_bloom_ref();
88
89            encode_buf.clear();
90            receipt_with_bloom.encode_2718(&mut encode_buf);
91
92            aggregated_bloom |= *receipt_with_bloom.bloom_ref();
93            builder.push_next(&encode_buf);
94        };
95
96        for indexed_receipt in self.receipt_rx {
97            if indexed_receipt.index == next {
98                push(indexed_receipt.receipt);
99                next += 1;
100
101                while let Some(receipt) = pending.remove(&next) {
102                    push(receipt);
103                    next += 1;
104                }
105            } else {
106                pending.insert(indexed_receipt.index, indexed_receipt.receipt);
107            }
108        }
109
110        if receipts_len.is_some_and(|len| len != next) {
111            tracing::error!(
112                target: "engine::tree::payload_processor",
113                expected = receipts_len,
114                received = next,
115                "Receipt root task received incomplete receipts, execution likely aborted"
116            );
117            return;
118        }
119
120        if !pending.is_empty() {
121            tracing::error!(
122                target: "engine::tree::payload_processor",
123                received = next,
124                pending = pending.len(),
125                "Receipt root task received gapped receipts"
126            );
127            return;
128        }
129
130        let _ = self.result_tx.send((builder.finalize(), aggregated_bloom));
131    }
132}
133
134#[cfg(test)]
135mod tests {
136    use super::*;
137    use alloy_consensus::{proofs::calculate_receipt_root, TxReceipt};
138    use alloy_primitives::{b256, hex, Address, Bytes, Log};
139    use crossbeam_channel::bounded;
140    use reth_ethereum_primitives::{Receipt, TxType};
141
142    #[tokio::test]
143    async fn test_receipt_root_task_empty() {
144        let (_tx, rx) = bounded::<IndexedReceipt<Receipt>>(1);
145        let (result_tx, result_rx) = oneshot::channel();
146        drop(_tx);
147
148        let handle = ReceiptRootTaskHandle::new(rx, result_tx);
149        tokio::task::spawn_blocking(move || handle.run(0)).await.unwrap();
150
151        let (root, bloom) = result_rx.await.unwrap();
152
153        // Empty trie root
154        assert_eq!(root, reth_trie_common::EMPTY_ROOT_HASH);
155        assert_eq!(bloom, Bloom::ZERO);
156    }
157
158    #[tokio::test]
159    async fn test_receipt_root_task_single_receipt() {
160        let receipts: Vec<Receipt> = vec![Receipt::default()];
161
162        let (tx, rx) = bounded(1);
163        let (result_tx, result_rx) = oneshot::channel();
164        let receipts_len = receipts.len();
165
166        let handle = ReceiptRootTaskHandle::new(rx, result_tx);
167        let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len));
168
169        for (i, receipt) in receipts.clone().into_iter().enumerate() {
170            tx.send(IndexedReceipt::new(i, receipt)).unwrap();
171        }
172        drop(tx);
173
174        join_handle.await.unwrap();
175        let (root, _bloom) = result_rx.await.unwrap();
176
177        // Verify against the standard calculation
178        let receipts_with_bloom: Vec<_> = receipts.iter().map(|r| r.with_bloom_ref()).collect();
179        let expected_root = calculate_receipt_root(&receipts_with_bloom);
180
181        assert_eq!(root, expected_root);
182    }
183
184    #[tokio::test]
185    async fn test_receipt_root_task_multiple_receipts() {
186        let receipts: Vec<Receipt> = vec![Receipt::default(); 5];
187
188        let (tx, rx) = bounded(4);
189        let (result_tx, result_rx) = oneshot::channel();
190        let receipts_len = receipts.len();
191
192        let handle = ReceiptRootTaskHandle::new(rx, result_tx);
193        let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len));
194
195        for (i, receipt) in receipts.into_iter().enumerate() {
196            tx.send(IndexedReceipt::new(i, receipt)).unwrap();
197        }
198        drop(tx);
199
200        join_handle.await.unwrap();
201        let (root, bloom) = result_rx.await.unwrap();
202
203        // Verify against expected values from existing test
204        assert_eq!(
205            root,
206            b256!("0x61353b4fb714dc1fccacbf7eafc4273e62f3d1eed716fe41b2a0cd2e12c63ebc")
207        );
208        assert_eq!(
209            bloom,
210            Bloom::from(hex!("00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"))
211        );
212    }
213
214    #[tokio::test]
215    async fn test_receipt_root_matches_standard_calculation() {
216        // Create some receipts with actual data
217        let receipts = vec![
218            Receipt {
219                tx_type: TxType::Legacy,
220                cumulative_gas_used: 21000,
221                success: true,
222                logs: vec![],
223            },
224            Receipt {
225                tx_type: TxType::Eip1559,
226                cumulative_gas_used: 42000,
227                success: true,
228                logs: vec![Log {
229                    address: Address::ZERO,
230                    data: alloy_primitives::LogData::new_unchecked(vec![B256::ZERO], Bytes::new()),
231                }],
232            },
233            Receipt {
234                tx_type: TxType::Eip2930,
235                cumulative_gas_used: 63000,
236                success: false,
237                logs: vec![],
238            },
239        ];
240
241        // Calculate expected values first (before we move receipts)
242        let receipts_with_bloom: Vec<_> = receipts.iter().map(|r| r.with_bloom_ref()).collect();
243        let expected_root = calculate_receipt_root(&receipts_with_bloom);
244        let expected_bloom =
245            receipts_with_bloom.iter().fold(Bloom::ZERO, |bloom, r| bloom | r.bloom_ref());
246
247        // Calculate using the task
248        let (tx, rx) = bounded(4);
249        let (result_tx, result_rx) = oneshot::channel();
250        let receipts_len = receipts.len();
251
252        let handle = ReceiptRootTaskHandle::new(rx, result_tx);
253        let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len));
254
255        for (i, receipt) in receipts.into_iter().enumerate() {
256            tx.send(IndexedReceipt::new(i, receipt)).unwrap();
257        }
258        drop(tx);
259
260        join_handle.await.unwrap();
261        let (task_root, task_bloom) = result_rx.await.unwrap();
262
263        assert_eq!(task_root, expected_root);
264        assert_eq!(task_bloom, expected_bloom);
265    }
266
267    #[tokio::test]
268    async fn test_receipt_root_task_out_of_order() {
269        let receipts: Vec<Receipt> = vec![Receipt::default(); 5];
270
271        // Calculate expected values first (before we move receipts)
272        let receipts_with_bloom: Vec<_> = receipts.iter().map(|r| r.with_bloom_ref()).collect();
273        let expected_root = calculate_receipt_root(&receipts_with_bloom);
274
275        let (tx, rx) = bounded(4);
276        let (result_tx, result_rx) = oneshot::channel();
277        let receipts_len = receipts.len();
278
279        let handle = ReceiptRootTaskHandle::new(rx, result_tx);
280        let join_handle = tokio::task::spawn_blocking(move || handle.run(receipts_len));
281
282        // Send in reverse order to test out-of-order handling
283        for (i, receipt) in receipts.into_iter().enumerate().rev() {
284            tx.send(IndexedReceipt::new(i, receipt)).unwrap();
285        }
286        drop(tx);
287
288        join_handle.await.unwrap();
289        let (root, _bloom) = result_rx.await.unwrap();
290
291        assert_eq!(root, expected_root);
292    }
293}