Skip to main content

reth_chain_state/
deferred_trie.rs

1use parking_lot::Mutex;
2use reth_metrics::{metrics::Counter, Metrics};
3use reth_trie::{
4    updates::{TrieUpdates, TrieUpdatesSorted},
5    HashedPostState, HashedPostStateSorted,
6};
7use std::{
8    fmt,
9    sync::{Arc, LazyLock},
10};
11use tracing::{debug_span, instrument};
12
13/// Shared handle to asynchronously populated per-block trie data.
14///
15/// If the background task has not completed by the time trie data is needed, the caller computes
16/// the sorted data synchronously from the retained unsorted inputs and caches the result.
17#[derive(Clone)]
18pub struct DeferredTrieData {
19    /// Shared deferred state holding either raw inputs (pending) or computed result (ready).
20    state: Arc<Mutex<DeferredTrieDataInner>>,
21}
22
23/// Sorted trie data computed for one executed block.
24///
25/// Cumulative overlays are intentionally managed by
26/// [`StateTrieOverlayManager`](crate::StateTrieOverlayManager), not by each block.
27#[derive(Clone, Debug, Default)]
28pub struct ComputedTrieData {
29    /// Sorted hashed post-state produced by execution.
30    pub hashed_state: Arc<HashedPostStateSorted>,
31    /// Sorted trie updates produced by state root computation.
32    pub trie_updates: Arc<TrieUpdatesSorted>,
33}
34
35/// Metrics for deferred trie computation.
36#[derive(Metrics)]
37#[metrics(scope = "sync.block_validation")]
38struct DeferredTrieMetrics {
39    /// Number of times deferred trie data was ready (async task completed first).
40    deferred_trie_async_ready: Counter,
41    /// Number of times deferred trie data required synchronous computation (fallback path).
42    deferred_trie_sync_fallback: Counter,
43}
44
45static DEFERRED_TRIE_METRICS: LazyLock<DeferredTrieMetrics> =
46    LazyLock::new(DeferredTrieMetrics::default);
47
48/// Internal state for deferred trie data.
49enum DeferredTrieDataInner {
50    /// Data is not yet available; raw inputs stored for fallback computation.
51    ///
52    /// Wrapped in `Option` to allow taking ownership during computation.
53    Pending(Option<PendingInputs>),
54    /// Data has been computed and is ready.
55    Ready(ComputedTrieData),
56}
57
58/// Inputs kept while a deferred trie computation is pending.
59#[derive(Clone, Debug)]
60struct PendingInputs {
61    /// Unsorted hashed post-state from execution.
62    hashed_state: Arc<HashedPostState>,
63    /// Unsorted trie updates from state root computation.
64    trie_updates: Arc<TrieUpdates>,
65}
66
67impl fmt::Debug for DeferredTrieData {
68    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
69        let state = self.state.lock();
70        match &*state {
71            DeferredTrieDataInner::Pending(_) => {
72                f.debug_struct("DeferredTrieData").field("state", &"pending").finish()
73            }
74            DeferredTrieDataInner::Ready(_) => {
75                f.debug_struct("DeferredTrieData").field("state", &"ready").finish()
76            }
77        }
78    }
79}
80
81impl DeferredTrieData {
82    /// Create a new pending handle with fallback inputs for synchronous computation.
83    pub fn pending(hashed_state: Arc<HashedPostState>, trie_updates: Arc<TrieUpdates>) -> Self {
84        Self {
85            state: Arc::new(Mutex::new(DeferredTrieDataInner::Pending(Some(PendingInputs {
86                hashed_state,
87                trie_updates,
88            })))),
89        }
90    }
91
92    /// Create a handle that is already populated with the given [`ComputedTrieData`].
93    pub fn ready(bundle: ComputedTrieData) -> Self {
94        Self { state: Arc::new(Mutex::new(DeferredTrieDataInner::Ready(bundle))) }
95    }
96
97    /// Sorts block execution outputs.
98    pub fn sort(
99        hashed_state: Arc<HashedPostState>,
100        trie_updates: Arc<TrieUpdates>,
101    ) -> ComputedTrieData {
102        let _span = debug_span!(target: "engine::tree::deferred_trie", "sort_inputs").entered();
103
104        #[cfg(feature = "rayon")]
105        let (sorted_hashed_state, sorted_trie_updates) = rayon::join(
106            || match Arc::try_unwrap(hashed_state) {
107                Ok(state) => state.into_sorted(),
108                Err(arc) => arc.clone_into_sorted(),
109            },
110            || match Arc::try_unwrap(trie_updates) {
111                Ok(updates) => updates.into_sorted(),
112                Err(arc) => arc.clone_into_sorted(),
113            },
114        );
115
116        #[cfg(not(feature = "rayon"))]
117        let (sorted_hashed_state, sorted_trie_updates) = (
118            match Arc::try_unwrap(hashed_state) {
119                Ok(state) => state.into_sorted(),
120                Err(arc) => arc.clone_into_sorted(),
121            },
122            match Arc::try_unwrap(trie_updates) {
123                Ok(updates) => updates.into_sorted(),
124                Err(arc) => arc.clone_into_sorted(),
125            },
126        );
127
128        ComputedTrieData::new(Arc::new(sorted_hashed_state), Arc::new(sorted_trie_updates))
129    }
130
131    /// Returns trie data, computing synchronously if the async task hasn't completed.
132    #[instrument(level = "debug", target = "engine::tree::deferred_trie", skip_all)]
133    pub fn wait_cloned(&self) -> ComputedTrieData {
134        let mut state = self.state.lock();
135        match &mut *state {
136            DeferredTrieDataInner::Ready(bundle) => {
137                DEFERRED_TRIE_METRICS.deferred_trie_async_ready.increment(1);
138                bundle.clone()
139            }
140            DeferredTrieDataInner::Pending(maybe_inputs) => {
141                DEFERRED_TRIE_METRICS.deferred_trie_sync_fallback.increment(1);
142
143                let inputs = maybe_inputs.take().expect("inputs must be present in Pending state");
144                let computed = Self::sort(inputs.hashed_state, inputs.trie_updates);
145                *state = DeferredTrieDataInner::Ready(computed.clone());
146
147                computed
148            }
149        }
150    }
151}
152
153impl ComputedTrieData {
154    /// Construct sorted trie data for one block.
155    pub const fn new(
156        hashed_state: Arc<HashedPostStateSorted>,
157        trie_updates: Arc<TrieUpdatesSorted>,
158    ) -> Self {
159        Self { hashed_state, trie_updates }
160    }
161}
162
163#[cfg(test)]
164mod tests {
165    use super::*;
166    use alloy_primitives::{map::B256Map, B256, U256};
167    use reth_primitives_traits::Account;
168    use reth_trie::{updates::TrieUpdates, HashedStorage};
169    use std::{
170        thread,
171        time::{Duration, Instant},
172    };
173
174    fn empty_pending() -> DeferredTrieData {
175        DeferredTrieData::pending(
176            Arc::new(HashedPostState::default()),
177            Arc::new(TrieUpdates::default()),
178        )
179    }
180
181    #[test]
182    fn ready_returns_immediately() {
183        let bundle = ComputedTrieData::default();
184        let deferred = DeferredTrieData::ready(bundle.clone());
185
186        let result = deferred.wait_cloned();
187
188        assert_eq!(result.hashed_state.total_len(), bundle.hashed_state.total_len());
189        assert_eq!(result.trie_updates.total_len(), bundle.trie_updates.total_len());
190    }
191
192    #[test]
193    fn pending_computes_and_caches_result() {
194        let deferred = empty_pending();
195
196        let first = deferred.wait_cloned();
197        let second = deferred.wait_cloned();
198
199        assert!(Arc::ptr_eq(&first.hashed_state, &second.hashed_state));
200        assert!(Arc::ptr_eq(&first.trie_updates, &second.trie_updates));
201    }
202
203    #[test]
204    fn concurrent_waits_share_computed_result() {
205        let deferred = empty_pending();
206        let deferred2 = deferred.clone();
207
208        let handle = thread::spawn(move || deferred2.wait_cloned());
209        let result1 = deferred.wait_cloned();
210        let result2 = handle.join().unwrap();
211
212        assert!(Arc::ptr_eq(&result1.hashed_state, &result2.hashed_state));
213        assert!(Arc::ptr_eq(&result1.trie_updates, &result2.trie_updates));
214    }
215
216    #[test]
217    fn sorts_non_empty_inputs() {
218        let hashed_address = B256::with_last_byte(1);
219        let hashed_slot = B256::with_last_byte(2);
220        let hashed_state = HashedPostState::default()
221            .with_accounts([(hashed_address, Some(Account::default()))])
222            .with_storages([(
223                hashed_address,
224                HashedStorage::from_iter(false, [(hashed_slot, U256::from(1))]),
225            )]);
226
227        let deferred =
228            DeferredTrieData::pending(Arc::new(hashed_state), Arc::new(TrieUpdates::default()));
229        let result = deferred.wait_cloned();
230
231        assert_eq!(result.hashed_state.total_len(), 2);
232        assert_eq!(result.trie_updates.total_len(), 0);
233    }
234
235    #[test]
236    fn wait_does_not_block_after_first_compute() {
237        let mut accounts = B256Map::default();
238        for i in 0..100 {
239            accounts.insert(B256::with_last_byte(i), Some(Account::default()));
240        }
241        let deferred = DeferredTrieData::pending(
242            Arc::new(HashedPostState { accounts, storages: Default::default() }),
243            Arc::new(TrieUpdates::default()),
244        );
245
246        let _ = deferred.wait_cloned();
247        let start = Instant::now();
248        let _ = deferred.wait_cloned();
249
250        assert!(start.elapsed() < Duration::from_millis(10));
251    }
252}