Skip to main content

reth_chain_state/
deferred_trie.rs

1use reth_metrics::{metrics::Counter, Metrics};
2use reth_trie::{
3    updates::{TrieUpdates, TrieUpdatesSorted},
4    HashedPostState, HashedPostStateSorted,
5};
6use std::{
7    fmt,
8    sync::{Arc, LazyLock, OnceLock},
9};
10use tracing::{debug_span, instrument};
11
12/// Shared handle to asynchronously populated sorted per-block trie data.
13///
14/// The corresponding [`DeferredTrieDataProducer`] owns the unsorted inputs and publishes the sorted
15/// data when the background task completes. Callers wait for that result instead of computing it
16/// synchronously.
17#[derive(Clone)]
18pub struct DeferredTrieData {
19    /// Shared deferred result populated by the corresponding [`DeferredTrieDataProducer`].
20    value: Arc<OnceLock<ComputedTrieData>>,
21}
22
23/// Producer consumed by a spawned task to compute sorted trie data for a [`DeferredTrieData`]
24/// handle.
25#[must_use = "DeferredTrieDataProducer must be consumed with compute_and_publish to wake trie data waiters"]
26pub struct DeferredTrieDataProducer {
27    /// Shared result initialized exactly once by this producer.
28    value: Arc<OnceLock<ComputedTrieData>>,
29    /// Unsorted inputs consumed when the producer computes trie data.
30    inputs: PendingInputs,
31}
32
33impl fmt::Debug for DeferredTrieDataProducer {
34    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
35        f.debug_struct("DeferredTrieDataProducer")
36            .field("inputs", &self.inputs)
37            .finish_non_exhaustive()
38    }
39}
40
41impl DeferredTrieDataProducer {
42    /// Computes sorted trie data, publishes it to waiters, and returns it to the task owner.
43    pub fn compute_and_publish(self) -> ComputedTrieData {
44        let Self { value, inputs } = self;
45        let computed = DeferredTrieData::sort(inputs.hashed_state, inputs.trie_updates);
46        let _ = value.set(computed.clone());
47        computed
48    }
49}
50
51/// Sorted trie data computed for one executed block.
52///
53/// Cumulative overlays are intentionally managed by
54/// [`StateTrieOverlayManager`](crate::StateTrieOverlayManager), not by each block.
55#[derive(Clone, Debug, Default)]
56pub struct ComputedTrieData {
57    /// Sorted hashed post-state produced by execution.
58    pub hashed_state: Arc<HashedPostStateSorted>,
59    /// Sorted trie updates produced by state root computation.
60    pub trie_updates: Arc<TrieUpdatesSorted>,
61}
62
63/// Metrics for deferred trie computation.
64#[derive(Metrics)]
65#[metrics(scope = "sync.block_validation")]
66struct DeferredTrieMetrics {
67    /// Number of times deferred trie data was ready (async task completed first).
68    deferred_trie_async_ready: Counter,
69    /// Number of times deferred trie data required waiting for the publishing task.
70    deferred_trie_task_wait: Counter,
71}
72
73static DEFERRED_TRIE_METRICS: LazyLock<DeferredTrieMetrics> =
74    LazyLock::new(DeferredTrieMetrics::default);
75
76/// Inputs kept while a deferred trie computation is pending.
77#[derive(Clone, Debug)]
78struct PendingInputs {
79    /// Unsorted hashed post-state from execution.
80    hashed_state: Arc<HashedPostState>,
81    /// Unsorted trie updates from state root computation.
82    trie_updates: Arc<TrieUpdates>,
83}
84
85impl fmt::Debug for DeferredTrieData {
86    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
87        f.debug_struct("DeferredTrieData")
88            .field("state", &if self.value.get().is_some() { "ready" } else { "pending" })
89            .finish()
90    }
91}
92
93impl DeferredTrieData {
94    /// Create a new pending handle and task that will publish the computed trie data.
95    pub fn pending(
96        hashed_state: Arc<HashedPostState>,
97        trie_updates: Arc<TrieUpdates>,
98    ) -> (Self, DeferredTrieDataProducer) {
99        let value = Arc::new(OnceLock::new());
100        (
101            Self { value: Arc::clone(&value) },
102            DeferredTrieDataProducer {
103                value,
104                inputs: PendingInputs { hashed_state, trie_updates },
105            },
106        )
107    }
108
109    /// Create a handle that is already populated with the given [`ComputedTrieData`].
110    pub fn ready(bundle: ComputedTrieData) -> Self {
111        Self { value: Arc::new(OnceLock::from(bundle)) }
112    }
113
114    /// Sorts block execution outputs.
115    pub fn sort(
116        hashed_state: Arc<HashedPostState>,
117        trie_updates: Arc<TrieUpdates>,
118    ) -> ComputedTrieData {
119        let _span = debug_span!(target: "engine::tree::deferred_trie", "sort_inputs").entered();
120
121        #[cfg(feature = "rayon")]
122        let (sorted_hashed_state, sorted_trie_updates) = rayon::join(
123            || match Arc::try_unwrap(hashed_state) {
124                Ok(state) => state.into_sorted(),
125                Err(arc) => arc.clone_into_sorted(),
126            },
127            || match Arc::try_unwrap(trie_updates) {
128                Ok(updates) => updates.into_sorted(),
129                Err(arc) => arc.clone_into_sorted(),
130            },
131        );
132
133        #[cfg(not(feature = "rayon"))]
134        let (sorted_hashed_state, sorted_trie_updates) = (
135            match Arc::try_unwrap(hashed_state) {
136                Ok(state) => state.into_sorted(),
137                Err(arc) => arc.clone_into_sorted(),
138            },
139            match Arc::try_unwrap(trie_updates) {
140                Ok(updates) => updates.into_sorted(),
141                Err(arc) => arc.clone_into_sorted(),
142            },
143        );
144
145        ComputedTrieData::new(Arc::new(sorted_hashed_state), Arc::new(sorted_trie_updates))
146    }
147
148    /// Returns trie data, waiting for the async publishing task if it has not completed.
149    #[instrument(level = "debug", target = "engine::tree::deferred_trie", skip_all)]
150    pub fn wait_cloned(&self) -> ComputedTrieData {
151        let bundle = match self.value.get() {
152            Some(bundle) => {
153                DEFERRED_TRIE_METRICS.deferred_trie_async_ready.increment(1);
154                bundle
155            }
156            None => {
157                DEFERRED_TRIE_METRICS.deferred_trie_task_wait.increment(1);
158                self.value.wait()
159            }
160        };
161
162        bundle.clone()
163    }
164}
165
166impl ComputedTrieData {
167    /// Construct sorted trie data for one block.
168    pub const fn new(
169        hashed_state: Arc<HashedPostStateSorted>,
170        trie_updates: Arc<TrieUpdatesSorted>,
171    ) -> Self {
172        Self { hashed_state, trie_updates }
173    }
174}
175
176#[cfg(test)]
177mod tests {
178    use super::*;
179    use alloy_primitives::{map::B256Map, B256, U256};
180    use reth_primitives_traits::Account;
181    use reth_trie::{updates::TrieUpdates, HashedStorage};
182    use std::{
183        thread,
184        time::{Duration, Instant},
185    };
186
187    fn empty_pending() -> (DeferredTrieData, DeferredTrieDataProducer) {
188        DeferredTrieData::pending(
189            Arc::new(HashedPostState::default()),
190            Arc::new(TrieUpdates::default()),
191        )
192    }
193
194    #[test]
195    fn ready_returns_immediately() {
196        let bundle = ComputedTrieData::default();
197        let deferred = DeferredTrieData::ready(bundle.clone());
198
199        let result = deferred.wait_cloned();
200
201        assert_eq!(result.hashed_state.total_len(), bundle.hashed_state.total_len());
202        assert_eq!(result.trie_updates.total_len(), bundle.trie_updates.total_len());
203    }
204
205    #[test]
206    fn pending_waits_for_task_and_caches_result() {
207        let (deferred, task) = empty_pending();
208
209        let published = task.compute_and_publish();
210        let first = deferred.wait_cloned();
211        let second = deferred.wait_cloned();
212
213        assert!(Arc::ptr_eq(&published.hashed_state, &first.hashed_state));
214        assert!(Arc::ptr_eq(&published.trie_updates, &first.trie_updates));
215        assert!(Arc::ptr_eq(&first.hashed_state, &second.hashed_state));
216        assert!(Arc::ptr_eq(&first.trie_updates, &second.trie_updates));
217    }
218
219    #[test]
220    fn pending_wait_blocks_until_task_publishes() {
221        let (deferred, task) = empty_pending();
222
223        let handle = thread::spawn(move || deferred.wait_cloned());
224        thread::sleep(Duration::from_millis(20));
225        assert!(!handle.is_finished());
226
227        let published = task.compute_and_publish();
228        let result = handle.join().unwrap();
229
230        assert!(Arc::ptr_eq(&published.hashed_state, &result.hashed_state));
231        assert!(Arc::ptr_eq(&published.trie_updates, &result.trie_updates));
232    }
233
234    #[test]
235    fn concurrent_waits_share_published_result() {
236        let (deferred, task) = empty_pending();
237        let deferred2 = deferred.clone();
238
239        let handle = thread::spawn(move || deferred2.wait_cloned());
240        let published = task.compute_and_publish();
241        let result1 = deferred.wait_cloned();
242        let result2 = handle.join().unwrap();
243
244        assert!(Arc::ptr_eq(&published.hashed_state, &result1.hashed_state));
245        assert!(Arc::ptr_eq(&published.trie_updates, &result1.trie_updates));
246        assert!(Arc::ptr_eq(&result1.hashed_state, &result2.hashed_state));
247        assert!(Arc::ptr_eq(&result1.trie_updates, &result2.trie_updates));
248    }
249
250    #[test]
251    fn sorts_non_empty_inputs() {
252        let hashed_address = B256::with_last_byte(1);
253        let hashed_slot = B256::with_last_byte(2);
254        let hashed_state = HashedPostState::default()
255            .with_accounts([(hashed_address, Some(Account::default()))])
256            .with_storages([(
257                hashed_address,
258                HashedStorage::from_iter(false, [(hashed_slot, U256::from(1))]),
259            )]);
260
261        let (deferred, task) =
262            DeferredTrieData::pending(Arc::new(hashed_state), Arc::new(TrieUpdates::default()));
263        let _ = task.compute_and_publish();
264        let result = deferred.wait_cloned();
265
266        assert_eq!(result.hashed_state.total_len(), 2);
267        assert_eq!(result.trie_updates.total_len(), 0);
268    }
269
270    #[test]
271    fn wait_does_not_block_after_first_compute() {
272        let mut accounts = B256Map::default();
273        for i in 0..100 {
274            accounts.insert(B256::with_last_byte(i), Some(Account::default()));
275        }
276        let (deferred, task) = DeferredTrieData::pending(
277            Arc::new(HashedPostState { accounts, storages: Default::default() }),
278            Arc::new(TrieUpdates::default()),
279        );
280
281        let _ = task.compute_and_publish();
282        let _ = deferred.wait_cloned();
283        let start = Instant::now();
284        let _ = deferred.wait_cloned();
285
286        assert!(start.elapsed() < Duration::from_millis(10));
287    }
288}