1use alloy_primitives::B256;
2use parking_lot::Mutex;
3use reth_metrics::{metrics::Counter, Metrics};
4use reth_trie::{
5 updates::{TrieUpdates, TrieUpdatesSorted},
6 HashedPostState, HashedPostStateSorted, TrieInputSorted,
7};
8use std::{
9 fmt,
10 sync::{Arc, LazyLock},
11};
12use tracing::instrument;
13
14#[derive(Clone)]
20pub struct DeferredTrieData {
21 state: Arc<Mutex<DeferredState>>,
23}
24
25#[derive(Clone, Debug, Default)]
29pub struct ComputedTrieData {
30 pub hashed_state: Arc<HashedPostStateSorted>,
32 pub trie_updates: Arc<TrieUpdatesSorted>,
34 pub anchored_trie_input: Option<AnchoredTrieInput>,
36}
37
38#[derive(Clone, Debug)]
42pub struct AnchoredTrieInput {
43 pub anchor_hash: B256,
45 pub trie_input: Arc<TrieInputSorted>,
47}
48
49#[derive(Metrics)]
51#[metrics(scope = "sync.block_validation")]
52struct DeferredTrieMetrics {
53 deferred_trie_async_ready: Counter,
55 deferred_trie_sync_fallback: Counter,
57}
58
59static DEFERRED_TRIE_METRICS: LazyLock<DeferredTrieMetrics> =
60 LazyLock::new(DeferredTrieMetrics::default);
61
62enum DeferredState {
64 Pending(PendingInputs),
66 Ready(ComputedTrieData),
68}
69
70#[derive(Clone, Debug)]
72struct PendingInputs {
73 hashed_state: Arc<HashedPostState>,
75 trie_updates: Arc<TrieUpdates>,
77 anchor_hash: B256,
79 ancestors: Vec<DeferredTrieData>,
81}
82
83impl fmt::Debug for DeferredTrieData {
84 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
85 let state = self.state.lock();
86 match &*state {
87 DeferredState::Pending(_) => {
88 f.debug_struct("DeferredTrieData").field("state", &"pending").finish()
89 }
90 DeferredState::Ready(_) => {
91 f.debug_struct("DeferredTrieData").field("state", &"ready").finish()
92 }
93 }
94 }
95}
96
97impl DeferredTrieData {
98 pub fn pending(
109 hashed_state: Arc<HashedPostState>,
110 trie_updates: Arc<TrieUpdates>,
111 anchor_hash: B256,
112 ancestors: Vec<Self>,
113 ) -> Self {
114 Self {
115 state: Arc::new(Mutex::new(DeferredState::Pending(PendingInputs {
116 hashed_state,
117 trie_updates,
118 anchor_hash,
119 ancestors,
120 }))),
121 }
122 }
123
124 pub fn ready(bundle: ComputedTrieData) -> Self {
129 Self { state: Arc::new(Mutex::new(DeferredState::Ready(bundle))) }
130 }
131
132 pub fn sort_and_build_trie_input(
152 hashed_state: &HashedPostState,
153 trie_updates: &TrieUpdates,
154 anchor_hash: B256,
155 ancestors: &[Self],
156 ) -> ComputedTrieData {
157 let sorted_hashed_state = Arc::new(hashed_state.clone_into_sorted());
159 let sorted_trie_updates = Arc::new(trie_updates.clone().into_sorted());
160
161 let mut overlay = TrieInputSorted::default();
163 for ancestor in ancestors {
164 let ancestor_data = ancestor.wait_cloned();
165 {
166 let state_mut = Arc::make_mut(&mut overlay.state);
167 state_mut.extend_ref(ancestor_data.hashed_state.as_ref());
168 }
169 {
170 let nodes_mut = Arc::make_mut(&mut overlay.nodes);
171 nodes_mut.extend_ref(ancestor_data.trie_updates.as_ref());
172 }
173 }
174
175 {
177 let state_mut = Arc::make_mut(&mut overlay.state);
178 state_mut.extend_ref(sorted_hashed_state.as_ref());
179 }
180 {
181 let nodes_mut = Arc::make_mut(&mut overlay.nodes);
182 nodes_mut.extend_ref(sorted_trie_updates.as_ref());
183 }
184
185 ComputedTrieData::with_trie_input(
186 sorted_hashed_state,
187 sorted_trie_updates,
188 anchor_hash,
189 Arc::new(overlay),
190 )
191 }
192
193 #[instrument(level = "debug", target = "engine::tree::deferred_trie", skip_all)]
205 pub fn wait_cloned(&self) -> ComputedTrieData {
206 let mut state = self.state.lock();
207 match &*state {
208 DeferredState::Ready(bundle) => {
210 DEFERRED_TRIE_METRICS.deferred_trie_async_ready.increment(1);
211 bundle.clone()
212 }
213 DeferredState::Pending(inputs) => {
216 DEFERRED_TRIE_METRICS.deferred_trie_sync_fallback.increment(1);
217 let computed = Self::sort_and_build_trie_input(
218 &inputs.hashed_state,
219 &inputs.trie_updates,
220 inputs.anchor_hash,
221 &inputs.ancestors,
222 );
223 *state = DeferredState::Ready(computed.clone());
224 computed
225 }
226 }
227 }
228}
229
230impl ComputedTrieData {
231 pub const fn with_trie_input(
233 hashed_state: Arc<HashedPostStateSorted>,
234 trie_updates: Arc<TrieUpdatesSorted>,
235 anchor_hash: B256,
236 trie_input: Arc<TrieInputSorted>,
237 ) -> Self {
238 Self {
239 hashed_state,
240 trie_updates,
241 anchored_trie_input: Some(AnchoredTrieInput { anchor_hash, trie_input }),
242 }
243 }
244
245 pub const fn without_trie_input(
254 hashed_state: Arc<HashedPostStateSorted>,
255 trie_updates: Arc<TrieUpdatesSorted>,
256 ) -> Self {
257 Self { hashed_state, trie_updates, anchored_trie_input: None }
258 }
259
260 pub fn anchor_hash(&self) -> Option<B256> {
262 self.anchored_trie_input.as_ref().map(|anchored| anchored.anchor_hash)
263 }
264
265 pub fn trie_input(&self) -> Option<&Arc<TrieInputSorted>> {
267 self.anchored_trie_input.as_ref().map(|anchored| &anchored.trie_input)
268 }
269}
270
271#[cfg(test)]
272mod tests {
273 use super::*;
274 use alloy_primitives::{map::B256Map, U256};
275 use reth_primitives_traits::Account;
276 use reth_trie::updates::TrieUpdates;
277 use std::{
278 sync::Arc,
279 thread,
280 time::{Duration, Instant},
281 };
282
283 fn empty_bundle() -> ComputedTrieData {
284 ComputedTrieData {
285 hashed_state: Arc::default(),
286 trie_updates: Arc::default(),
287 anchored_trie_input: None,
288 }
289 }
290
291 fn empty_pending() -> DeferredTrieData {
292 empty_pending_with_anchor(B256::ZERO)
293 }
294
295 fn empty_pending_with_anchor(anchor: B256) -> DeferredTrieData {
296 DeferredTrieData::pending(
297 Arc::new(HashedPostState::default()),
298 Arc::new(TrieUpdates::default()),
299 anchor,
300 Vec::new(),
301 )
302 }
303
304 #[test]
306 fn ready_returns_immediately() {
307 let bundle = empty_bundle();
308 let deferred = DeferredTrieData::ready(bundle.clone());
309
310 let start = Instant::now();
311 let result = deferred.wait_cloned();
312 let elapsed = start.elapsed();
313
314 assert_eq!(result.hashed_state, bundle.hashed_state);
315 assert_eq!(result.trie_updates, bundle.trie_updates);
316 assert_eq!(result.anchor_hash(), bundle.anchor_hash());
317 assert!(elapsed < Duration::from_millis(20));
318 }
319
320 #[test]
322 fn pending_computes_fallback() {
323 let deferred = empty_pending();
324
325 let start = Instant::now();
327 let result = deferred.wait_cloned();
328 let elapsed = start.elapsed();
329
330 assert!(elapsed < Duration::from_millis(100));
332 assert!(result.hashed_state.is_empty());
333 }
334
335 #[test]
337 fn fallback_result_is_cached() {
338 let deferred = empty_pending();
339
340 let first = deferred.wait_cloned();
342 let second = deferred.wait_cloned();
344
345 assert!(Arc::ptr_eq(&first.hashed_state, &second.hashed_state));
346 assert!(Arc::ptr_eq(&first.trie_updates, &second.trie_updates));
347 assert_eq!(first.anchor_hash(), second.anchor_hash());
348 }
349
350 #[test]
353 fn concurrent_wait_cloned_computes_once() {
354 let deferred = empty_pending();
355
356 let handles: Vec<_> = (0..10)
358 .map(|_| {
359 let d = deferred.clone();
360 thread::spawn(move || d.wait_cloned())
361 })
362 .collect();
363
364 let results: Vec<_> = handles.into_iter().map(|h| h.join().unwrap()).collect();
366
367 let first = &results[0];
369 for result in &results[1..] {
370 assert!(Arc::ptr_eq(&first.hashed_state, &result.hashed_state));
371 assert!(Arc::ptr_eq(&first.trie_updates, &result.trie_updates));
372 }
373 }
374
375 #[test]
378 fn ancestors_are_merged() {
379 let ancestor_bundle = ComputedTrieData {
381 hashed_state: Arc::default(),
382 trie_updates: Arc::default(),
383 anchored_trie_input: Some(AnchoredTrieInput {
384 anchor_hash: B256::with_last_byte(1),
385 trie_input: Arc::new(TrieInputSorted::default()),
386 }),
387 };
388 let ancestor = DeferredTrieData::ready(ancestor_bundle);
389
390 let deferred = DeferredTrieData::pending(
392 Arc::new(HashedPostState::default()),
393 Arc::new(TrieUpdates::default()),
394 B256::with_last_byte(2),
395 vec![ancestor],
396 );
397
398 let result = deferred.wait_cloned();
399 assert_eq!(result.anchor_hash(), Some(B256::with_last_byte(2)));
401 }
402
403 #[test]
406 fn ancestors_merge_in_chronological_order() {
407 let key = B256::with_last_byte(1);
408 let oldest_state = HashedPostStateSorted::new(
410 vec![(key, Some(Account { nonce: 1, balance: U256::ZERO, bytecode_hash: None }))],
411 B256Map::default(),
412 );
413 let newest_state = HashedPostStateSorted::new(
415 vec![(key, Some(Account { nonce: 2, balance: U256::ZERO, bytecode_hash: None }))],
416 B256Map::default(),
417 );
418
419 let oldest = ComputedTrieData {
420 hashed_state: Arc::new(oldest_state),
421 trie_updates: Arc::default(),
422 anchored_trie_input: None,
423 };
424 let newest = ComputedTrieData {
425 hashed_state: Arc::new(newest_state),
426 trie_updates: Arc::default(),
427 anchored_trie_input: None,
428 };
429
430 let deferred = DeferredTrieData::pending(
432 Arc::new(HashedPostState::default()),
433 Arc::new(TrieUpdates::default()),
434 B256::ZERO,
435 vec![DeferredTrieData::ready(oldest), DeferredTrieData::ready(newest)],
436 );
437
438 let result = deferred.wait_cloned();
439 let overlay_state = &result.anchored_trie_input.as_ref().unwrap().trie_input.state.accounts;
440 assert_eq!(overlay_state.len(), 1);
441 let (_, account) = &overlay_state[0];
442 assert_eq!(account.unwrap().nonce, 2);
443 }
444}