reth_chain_state/
deferred_trie.rs1use parking_lot::Mutex;
2use reth_metrics::{metrics::Counter, Metrics};
3use reth_trie::{
4 updates::{TrieUpdates, TrieUpdatesSorted},
5 HashedPostState, HashedPostStateSorted,
6};
7use std::{
8 fmt,
9 sync::{Arc, LazyLock},
10};
11use tracing::{debug_span, instrument};
12
13#[derive(Clone)]
18pub struct DeferredTrieData {
19 state: Arc<Mutex<DeferredTrieDataInner>>,
21}
22
23#[derive(Clone, Debug, Default)]
28pub struct ComputedTrieData {
29 pub hashed_state: Arc<HashedPostStateSorted>,
31 pub trie_updates: Arc<TrieUpdatesSorted>,
33}
34
35#[derive(Metrics)]
37#[metrics(scope = "sync.block_validation")]
38struct DeferredTrieMetrics {
39 deferred_trie_async_ready: Counter,
41 deferred_trie_sync_fallback: Counter,
43}
44
45static DEFERRED_TRIE_METRICS: LazyLock<DeferredTrieMetrics> =
46 LazyLock::new(DeferredTrieMetrics::default);
47
48enum DeferredTrieDataInner {
50 Pending(Option<PendingInputs>),
54 Ready(ComputedTrieData),
56}
57
58#[derive(Clone, Debug)]
60struct PendingInputs {
61 hashed_state: Arc<HashedPostState>,
63 trie_updates: Arc<TrieUpdates>,
65}
66
67impl fmt::Debug for DeferredTrieData {
68 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
69 let state = self.state.lock();
70 match &*state {
71 DeferredTrieDataInner::Pending(_) => {
72 f.debug_struct("DeferredTrieData").field("state", &"pending").finish()
73 }
74 DeferredTrieDataInner::Ready(_) => {
75 f.debug_struct("DeferredTrieData").field("state", &"ready").finish()
76 }
77 }
78 }
79}
80
81impl DeferredTrieData {
82 pub fn pending(hashed_state: Arc<HashedPostState>, trie_updates: Arc<TrieUpdates>) -> Self {
84 Self {
85 state: Arc::new(Mutex::new(DeferredTrieDataInner::Pending(Some(PendingInputs {
86 hashed_state,
87 trie_updates,
88 })))),
89 }
90 }
91
92 pub fn ready(bundle: ComputedTrieData) -> Self {
94 Self { state: Arc::new(Mutex::new(DeferredTrieDataInner::Ready(bundle))) }
95 }
96
97 pub fn sort(
99 hashed_state: Arc<HashedPostState>,
100 trie_updates: Arc<TrieUpdates>,
101 ) -> ComputedTrieData {
102 let _span = debug_span!(target: "engine::tree::deferred_trie", "sort_inputs").entered();
103
104 #[cfg(feature = "rayon")]
105 let (sorted_hashed_state, sorted_trie_updates) = rayon::join(
106 || match Arc::try_unwrap(hashed_state) {
107 Ok(state) => state.into_sorted(),
108 Err(arc) => arc.clone_into_sorted(),
109 },
110 || match Arc::try_unwrap(trie_updates) {
111 Ok(updates) => updates.into_sorted(),
112 Err(arc) => arc.clone_into_sorted(),
113 },
114 );
115
116 #[cfg(not(feature = "rayon"))]
117 let (sorted_hashed_state, sorted_trie_updates) = (
118 match Arc::try_unwrap(hashed_state) {
119 Ok(state) => state.into_sorted(),
120 Err(arc) => arc.clone_into_sorted(),
121 },
122 match Arc::try_unwrap(trie_updates) {
123 Ok(updates) => updates.into_sorted(),
124 Err(arc) => arc.clone_into_sorted(),
125 },
126 );
127
128 ComputedTrieData::new(Arc::new(sorted_hashed_state), Arc::new(sorted_trie_updates))
129 }
130
131 #[instrument(level = "debug", target = "engine::tree::deferred_trie", skip_all)]
133 pub fn wait_cloned(&self) -> ComputedTrieData {
134 let mut state = self.state.lock();
135 match &mut *state {
136 DeferredTrieDataInner::Ready(bundle) => {
137 DEFERRED_TRIE_METRICS.deferred_trie_async_ready.increment(1);
138 bundle.clone()
139 }
140 DeferredTrieDataInner::Pending(maybe_inputs) => {
141 DEFERRED_TRIE_METRICS.deferred_trie_sync_fallback.increment(1);
142
143 let inputs = maybe_inputs.take().expect("inputs must be present in Pending state");
144 let computed = Self::sort(inputs.hashed_state, inputs.trie_updates);
145 *state = DeferredTrieDataInner::Ready(computed.clone());
146
147 computed
148 }
149 }
150 }
151}
152
153impl ComputedTrieData {
154 pub const fn new(
156 hashed_state: Arc<HashedPostStateSorted>,
157 trie_updates: Arc<TrieUpdatesSorted>,
158 ) -> Self {
159 Self { hashed_state, trie_updates }
160 }
161}
162
163#[cfg(test)]
164mod tests {
165 use super::*;
166 use alloy_primitives::{map::B256Map, B256, U256};
167 use reth_primitives_traits::Account;
168 use reth_trie::{updates::TrieUpdates, HashedStorage};
169 use std::{
170 thread,
171 time::{Duration, Instant},
172 };
173
174 fn empty_pending() -> DeferredTrieData {
175 DeferredTrieData::pending(
176 Arc::new(HashedPostState::default()),
177 Arc::new(TrieUpdates::default()),
178 )
179 }
180
181 #[test]
182 fn ready_returns_immediately() {
183 let bundle = ComputedTrieData::default();
184 let deferred = DeferredTrieData::ready(bundle.clone());
185
186 let result = deferred.wait_cloned();
187
188 assert_eq!(result.hashed_state.total_len(), bundle.hashed_state.total_len());
189 assert_eq!(result.trie_updates.total_len(), bundle.trie_updates.total_len());
190 }
191
192 #[test]
193 fn pending_computes_and_caches_result() {
194 let deferred = empty_pending();
195
196 let first = deferred.wait_cloned();
197 let second = deferred.wait_cloned();
198
199 assert!(Arc::ptr_eq(&first.hashed_state, &second.hashed_state));
200 assert!(Arc::ptr_eq(&first.trie_updates, &second.trie_updates));
201 }
202
203 #[test]
204 fn concurrent_waits_share_computed_result() {
205 let deferred = empty_pending();
206 let deferred2 = deferred.clone();
207
208 let handle = thread::spawn(move || deferred2.wait_cloned());
209 let result1 = deferred.wait_cloned();
210 let result2 = handle.join().unwrap();
211
212 assert!(Arc::ptr_eq(&result1.hashed_state, &result2.hashed_state));
213 assert!(Arc::ptr_eq(&result1.trie_updates, &result2.trie_updates));
214 }
215
216 #[test]
217 fn sorts_non_empty_inputs() {
218 let hashed_address = B256::with_last_byte(1);
219 let hashed_slot = B256::with_last_byte(2);
220 let hashed_state = HashedPostState::default()
221 .with_accounts([(hashed_address, Some(Account::default()))])
222 .with_storages([(
223 hashed_address,
224 HashedStorage::from_iter(false, [(hashed_slot, U256::from(1))]),
225 )]);
226
227 let deferred =
228 DeferredTrieData::pending(Arc::new(hashed_state), Arc::new(TrieUpdates::default()));
229 let result = deferred.wait_cloned();
230
231 assert_eq!(result.hashed_state.total_len(), 2);
232 assert_eq!(result.trie_updates.total_len(), 0);
233 }
234
235 #[test]
236 fn wait_does_not_block_after_first_compute() {
237 let mut accounts = B256Map::default();
238 for i in 0..100 {
239 accounts.insert(B256::with_last_byte(i), Some(Account::default()));
240 }
241 let deferred = DeferredTrieData::pending(
242 Arc::new(HashedPostState { accounts, storages: Default::default() }),
243 Arc::new(TrieUpdates::default()),
244 );
245
246 let _ = deferred.wait_cloned();
247 let start = Instant::now();
248 let _ = deferred.wait_cloned();
249
250 assert!(start.elapsed() < Duration::from_millis(10));
251 }
252}