1use reth_metrics::{metrics::Counter, Metrics};
2use reth_trie::{
3 updates::{TrieUpdates, TrieUpdatesSorted},
4 HashedPostState, HashedPostStateSorted,
5};
6use std::{
7 fmt,
8 sync::{Arc, LazyLock, OnceLock},
9};
10use tracing::{debug_span, instrument};
11
12#[derive(Clone)]
18pub struct DeferredTrieData {
19 value: Arc<OnceLock<ComputedTrieData>>,
21}
22
23#[must_use = "DeferredTrieDataProducer must be consumed with compute_and_publish to wake trie data waiters"]
26pub struct DeferredTrieDataProducer {
27 value: Arc<OnceLock<ComputedTrieData>>,
29 inputs: PendingInputs,
31}
32
33impl fmt::Debug for DeferredTrieDataProducer {
34 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
35 f.debug_struct("DeferredTrieDataProducer")
36 .field("inputs", &self.inputs)
37 .finish_non_exhaustive()
38 }
39}
40
41impl DeferredTrieDataProducer {
42 pub fn compute_and_publish(self) -> ComputedTrieData {
44 let Self { value, inputs } = self;
45 let computed = DeferredTrieData::sort(inputs.hashed_state, inputs.trie_updates);
46 let _ = value.set(computed.clone());
47 computed
48 }
49}
50
51#[derive(Clone, Debug, Default)]
56pub struct ComputedTrieData {
57 pub hashed_state: Arc<HashedPostStateSorted>,
59 pub trie_updates: Arc<TrieUpdatesSorted>,
61}
62
63#[derive(Metrics)]
65#[metrics(scope = "sync.block_validation")]
66struct DeferredTrieMetrics {
67 deferred_trie_async_ready: Counter,
69 deferred_trie_task_wait: Counter,
71}
72
73static DEFERRED_TRIE_METRICS: LazyLock<DeferredTrieMetrics> =
74 LazyLock::new(DeferredTrieMetrics::default);
75
76#[derive(Clone, Debug)]
78struct PendingInputs {
79 hashed_state: Arc<HashedPostState>,
81 trie_updates: Arc<TrieUpdates>,
83}
84
85impl fmt::Debug for DeferredTrieData {
86 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
87 f.debug_struct("DeferredTrieData")
88 .field("state", &if self.value.get().is_some() { "ready" } else { "pending" })
89 .finish()
90 }
91}
92
93impl DeferredTrieData {
94 pub fn pending(
96 hashed_state: Arc<HashedPostState>,
97 trie_updates: Arc<TrieUpdates>,
98 ) -> (Self, DeferredTrieDataProducer) {
99 let value = Arc::new(OnceLock::new());
100 (
101 Self { value: Arc::clone(&value) },
102 DeferredTrieDataProducer {
103 value,
104 inputs: PendingInputs { hashed_state, trie_updates },
105 },
106 )
107 }
108
109 pub fn ready(bundle: ComputedTrieData) -> Self {
111 Self { value: Arc::new(OnceLock::from(bundle)) }
112 }
113
114 pub fn sort(
116 hashed_state: Arc<HashedPostState>,
117 trie_updates: Arc<TrieUpdates>,
118 ) -> ComputedTrieData {
119 let _span = debug_span!(target: "engine::tree::deferred_trie", "sort_inputs").entered();
120
121 #[cfg(feature = "rayon")]
122 let (sorted_hashed_state, sorted_trie_updates) = rayon::join(
123 || match Arc::try_unwrap(hashed_state) {
124 Ok(state) => state.into_sorted(),
125 Err(arc) => arc.clone_into_sorted(),
126 },
127 || match Arc::try_unwrap(trie_updates) {
128 Ok(updates) => updates.into_sorted(),
129 Err(arc) => arc.clone_into_sorted(),
130 },
131 );
132
133 #[cfg(not(feature = "rayon"))]
134 let (sorted_hashed_state, sorted_trie_updates) = (
135 match Arc::try_unwrap(hashed_state) {
136 Ok(state) => state.into_sorted(),
137 Err(arc) => arc.clone_into_sorted(),
138 },
139 match Arc::try_unwrap(trie_updates) {
140 Ok(updates) => updates.into_sorted(),
141 Err(arc) => arc.clone_into_sorted(),
142 },
143 );
144
145 ComputedTrieData::new(Arc::new(sorted_hashed_state), Arc::new(sorted_trie_updates))
146 }
147
148 #[instrument(level = "debug", target = "engine::tree::deferred_trie", skip_all)]
150 pub fn wait_cloned(&self) -> ComputedTrieData {
151 let bundle = match self.value.get() {
152 Some(bundle) => {
153 DEFERRED_TRIE_METRICS.deferred_trie_async_ready.increment(1);
154 bundle
155 }
156 None => {
157 DEFERRED_TRIE_METRICS.deferred_trie_task_wait.increment(1);
158 self.value.wait()
159 }
160 };
161
162 bundle.clone()
163 }
164}
165
166impl ComputedTrieData {
167 pub const fn new(
169 hashed_state: Arc<HashedPostStateSorted>,
170 trie_updates: Arc<TrieUpdatesSorted>,
171 ) -> Self {
172 Self { hashed_state, trie_updates }
173 }
174}
175
176#[cfg(test)]
177mod tests {
178 use super::*;
179 use alloy_primitives::{map::B256Map, B256, U256};
180 use reth_primitives_traits::Account;
181 use reth_trie::{updates::TrieUpdates, HashedStorage};
182 use std::{
183 thread,
184 time::{Duration, Instant},
185 };
186
187 fn empty_pending() -> (DeferredTrieData, DeferredTrieDataProducer) {
188 DeferredTrieData::pending(
189 Arc::new(HashedPostState::default()),
190 Arc::new(TrieUpdates::default()),
191 )
192 }
193
194 #[test]
195 fn ready_returns_immediately() {
196 let bundle = ComputedTrieData::default();
197 let deferred = DeferredTrieData::ready(bundle.clone());
198
199 let result = deferred.wait_cloned();
200
201 assert_eq!(result.hashed_state.total_len(), bundle.hashed_state.total_len());
202 assert_eq!(result.trie_updates.total_len(), bundle.trie_updates.total_len());
203 }
204
205 #[test]
206 fn pending_waits_for_task_and_caches_result() {
207 let (deferred, task) = empty_pending();
208
209 let published = task.compute_and_publish();
210 let first = deferred.wait_cloned();
211 let second = deferred.wait_cloned();
212
213 assert!(Arc::ptr_eq(&published.hashed_state, &first.hashed_state));
214 assert!(Arc::ptr_eq(&published.trie_updates, &first.trie_updates));
215 assert!(Arc::ptr_eq(&first.hashed_state, &second.hashed_state));
216 assert!(Arc::ptr_eq(&first.trie_updates, &second.trie_updates));
217 }
218
219 #[test]
220 fn pending_wait_blocks_until_task_publishes() {
221 let (deferred, task) = empty_pending();
222
223 let handle = thread::spawn(move || deferred.wait_cloned());
224 thread::sleep(Duration::from_millis(20));
225 assert!(!handle.is_finished());
226
227 let published = task.compute_and_publish();
228 let result = handle.join().unwrap();
229
230 assert!(Arc::ptr_eq(&published.hashed_state, &result.hashed_state));
231 assert!(Arc::ptr_eq(&published.trie_updates, &result.trie_updates));
232 }
233
234 #[test]
235 fn concurrent_waits_share_published_result() {
236 let (deferred, task) = empty_pending();
237 let deferred2 = deferred.clone();
238
239 let handle = thread::spawn(move || deferred2.wait_cloned());
240 let published = task.compute_and_publish();
241 let result1 = deferred.wait_cloned();
242 let result2 = handle.join().unwrap();
243
244 assert!(Arc::ptr_eq(&published.hashed_state, &result1.hashed_state));
245 assert!(Arc::ptr_eq(&published.trie_updates, &result1.trie_updates));
246 assert!(Arc::ptr_eq(&result1.hashed_state, &result2.hashed_state));
247 assert!(Arc::ptr_eq(&result1.trie_updates, &result2.trie_updates));
248 }
249
250 #[test]
251 fn sorts_non_empty_inputs() {
252 let hashed_address = B256::with_last_byte(1);
253 let hashed_slot = B256::with_last_byte(2);
254 let hashed_state = HashedPostState::default()
255 .with_accounts([(hashed_address, Some(Account::default()))])
256 .with_storages([(
257 hashed_address,
258 HashedStorage::from_iter(false, [(hashed_slot, U256::from(1))]),
259 )]);
260
261 let (deferred, task) =
262 DeferredTrieData::pending(Arc::new(hashed_state), Arc::new(TrieUpdates::default()));
263 let _ = task.compute_and_publish();
264 let result = deferred.wait_cloned();
265
266 assert_eq!(result.hashed_state.total_len(), 2);
267 assert_eq!(result.trie_updates.total_len(), 0);
268 }
269
270 #[test]
271 fn wait_does_not_block_after_first_compute() {
272 let mut accounts = B256Map::default();
273 for i in 0..100 {
274 accounts.insert(B256::with_last_byte(i), Some(Account::default()));
275 }
276 let (deferred, task) = DeferredTrieData::pending(
277 Arc::new(HashedPostState { accounts, storages: Default::default() }),
278 Arc::new(TrieUpdates::default()),
279 );
280
281 let _ = task.compute_and_publish();
282 let _ = deferred.wait_cloned();
283 let start = Instant::now();
284 let _ = deferred.wait_cloned();
285
286 assert!(start.elapsed() < Duration::from_millis(10));
287 }
288}