1use std::sync::Arc;
4
5use crate::tree::{
6 multiproof::{
7 dispatch_with_chunking, evm_state_to_hashed_post_state, MultiProofMessage,
8 DEFAULT_MAX_TARGETS_FOR_CHUNKING,
9 },
10 payload_processor::multiproof::MultiProofTaskMetrics,
11};
12use alloy_primitives::B256;
13use alloy_rlp::{Decodable, Encodable};
14use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
15use rayon::iter::{IntoParallelIterator, ParallelIterator};
16use reth_primitives_traits::{Account, FastInstant as Instant};
17use reth_tasks::Runtime;
18use reth_trie::{
19 updates::TrieUpdates, DecodedMultiProofV2, HashedPostState, TrieAccount, EMPTY_ROOT_HASH,
20 TRIE_ACCOUNT_RLP_MAX_SIZE,
21};
22use reth_trie_common::{MultiProofTargetsV2, ProofV2Target};
23use reth_trie_parallel::{
24 proof_task::{
25 AccountMultiproofInput, ProofResultContext, ProofResultMessage, ProofWorkerHandle,
26 },
27 root::ParallelStateRootError,
28};
29#[cfg(feature = "trie-debug")]
30use reth_trie_sparse::debug_recorder::TrieDebugRecorder;
31use reth_trie_sparse::{
32 errors::SparseTrieResult, ConfigurableSparseTrie, DeferredDrops, LeafUpdate,
33 RevealableSparseTrie, SparseStateTrie, SparseTrie,
34};
35use revm_primitives::{hash_map::Entry, B256Map};
36use tracing::{debug, debug_span, error, instrument, trace_span};
37
38const MAX_PENDING_UPDATES: usize = 100;
40
41pub(super) struct SparseTrieCacheTask<A = ConfigurableSparseTrie, S = ConfigurableSparseTrie> {
43 proof_result_tx: CrossbeamSender<ProofResultMessage>,
45 proof_result_rx: CrossbeamReceiver<ProofResultMessage>,
47 updates: CrossbeamReceiver<SparseTrieTaskMessage>,
49 trie: SparseStateTrie<A, S>,
51 proof_worker_handle: ProofWorkerHandle,
53
54 chunk_size: usize,
57 max_targets_for_chunking: usize,
61
62 account_updates: B256Map<LeafUpdate>,
64 storage_updates: B256Map<B256Map<LeafUpdate>>,
66
67 new_account_updates: B256Map<LeafUpdate>,
69 new_storage_updates: B256Map<B256Map<LeafUpdate>>,
71 pending_account_updates: B256Map<Option<Option<Account>>>,
85 fetched_account_targets: B256Map<u8>,
88 fetched_storage_targets: B256Map<B256Map<u8>>,
91 account_rlp_buf: Vec<u8>,
93 finished_state_updates: bool,
95 account_cache_hits: u64,
97 account_cache_misses: u64,
99 storage_cache_hits: u64,
101 storage_cache_misses: u64,
103 pending_targets: PendingTargets,
105 pending_updates: usize,
108
109 metrics: MultiProofTaskMetrics,
111}
112
113impl<A, S> SparseTrieCacheTask<A, S>
114where
115 A: SparseTrie + Default,
116 S: SparseTrie + Default + Clone,
117{
118 pub(super) fn new_with_trie(
120 executor: &Runtime,
121 updates: CrossbeamReceiver<MultiProofMessage>,
122 proof_worker_handle: ProofWorkerHandle,
123 metrics: MultiProofTaskMetrics,
124 trie: SparseStateTrie<A, S>,
125 chunk_size: usize,
126 ) -> Self {
127 let (proof_result_tx, proof_result_rx) = crossbeam_channel::unbounded();
128 let (hashed_state_tx, hashed_state_rx) = crossbeam_channel::unbounded();
129
130 let parent_span = tracing::Span::current();
131 let hashing_metrics = metrics.clone();
132 executor.spawn_blocking_named("trie-hashing", move || {
133 let _span = debug_span!(parent: parent_span, "run_hashing_task").entered();
134 Self::run_hashing_task(updates, hashed_state_tx, hashing_metrics)
135 });
136
137 Self {
138 proof_result_tx,
139 proof_result_rx,
140 updates: hashed_state_rx,
141 proof_worker_handle,
142 trie,
143 chunk_size,
144 max_targets_for_chunking: DEFAULT_MAX_TARGETS_FOR_CHUNKING,
145 account_updates: Default::default(),
146 storage_updates: Default::default(),
147 new_account_updates: Default::default(),
148 new_storage_updates: Default::default(),
149 pending_account_updates: Default::default(),
150 fetched_account_targets: Default::default(),
151 fetched_storage_targets: Default::default(),
152 account_rlp_buf: Vec::with_capacity(TRIE_ACCOUNT_RLP_MAX_SIZE),
153 finished_state_updates: Default::default(),
154 account_cache_hits: 0,
155 account_cache_misses: 0,
156 storage_cache_hits: 0,
157 storage_cache_misses: 0,
158 pending_targets: Default::default(),
159 pending_updates: Default::default(),
160 metrics,
161 }
162 }
163
164 fn run_hashing_task(
167 updates: CrossbeamReceiver<MultiProofMessage>,
168 hashed_state_tx: CrossbeamSender<SparseTrieTaskMessage>,
169 metrics: MultiProofTaskMetrics,
170 ) {
171 let mut total_idle_time = std::time::Duration::ZERO;
172 let mut idle_start = Instant::now();
173
174 while let Ok(message) = updates.recv() {
175 total_idle_time += idle_start.elapsed();
176
177 let msg = match message {
178 MultiProofMessage::PrefetchProofs(targets) => {
179 SparseTrieTaskMessage::PrefetchProofs(targets)
180 }
181 MultiProofMessage::StateUpdate(_, state) => {
182 let _span = debug_span!(target: "engine::tree::payload_processor::sparse_trie", "hashing_state_update", n = state.len()).entered();
183 let hashed = evm_state_to_hashed_post_state(state);
184 SparseTrieTaskMessage::HashedState(hashed)
185 }
186 MultiProofMessage::FinishedStateUpdates => {
187 SparseTrieTaskMessage::FinishedStateUpdates
188 }
189 MultiProofMessage::BlockAccessList(_) => {
190 idle_start = Instant::now();
191 continue;
192 }
193 MultiProofMessage::HashedStateUpdate(state) => {
194 SparseTrieTaskMessage::HashedState(state)
195 }
196 };
197 if hashed_state_tx.send(msg).is_err() {
198 break;
199 }
200
201 idle_start = Instant::now();
202 }
203
204 metrics.hashing_task_idle_time_seconds.record(total_idle_time.as_secs_f64());
205 }
206
207 pub(super) fn into_trie_for_reuse(
215 self,
216 max_hot_slots: usize,
217 max_hot_accounts: usize,
218 max_nodes_capacity: usize,
219 max_values_capacity: usize,
220 disable_pruning: bool,
221 updates: &TrieUpdates,
222 ) -> (SparseStateTrie<A, S>, DeferredDrops) {
223 let Self { mut trie, .. } = self;
224 trie.commit_updates(updates);
225 if !disable_pruning {
226 trie.prune(max_hot_slots, max_hot_accounts);
227 trie.shrink_to(max_nodes_capacity, max_values_capacity);
228 }
229 let deferred = trie.take_deferred_drops();
230 (trie, deferred)
231 }
232
233 pub(super) fn into_cleared_trie(
238 self,
239 max_nodes_capacity: usize,
240 max_values_capacity: usize,
241 ) -> (SparseStateTrie<A, S>, DeferredDrops) {
242 let Self { mut trie, .. } = self;
243 trie.clear();
244 trie.shrink_to(max_nodes_capacity, max_values_capacity);
245 let deferred = trie.take_deferred_drops();
246 (trie, deferred)
247 }
248
249 #[instrument(
256 name = "SparseTrieCacheTask::run",
257 level = "debug",
258 target = "engine::tree::payload_processor::sparse_trie",
259 skip_all
260 )]
261 pub(super) fn run(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
262 let now = Instant::now();
263
264 let mut total_idle_time = std::time::Duration::ZERO;
265 let mut idle_start = Instant::now();
266
267 loop {
268 let mut t = Instant::now();
269 crossbeam_channel::select_biased! {
270 recv(self.updates) -> message => {
271 let wake = Instant::now();
272
273 let update = match message {
274 Ok(m) => m,
275 Err(_) => {
276 return Err(ParallelStateRootError::Other(
277 "updates channel disconnected before state root calculation".to_string(),
278 ))
279 }
280 };
281
282 total_idle_time += wake.duration_since(idle_start);
283 self.metrics
284 .sparse_trie_channel_wait_duration_histogram
285 .record(wake.duration_since(t));
286
287 self.on_message(update);
288 self.pending_updates += 1;
289 }
290 recv(self.proof_result_rx) -> message => {
291 let phase_end = Instant::now();
292 total_idle_time += phase_end.duration_since(idle_start);
293 self.metrics
294 .sparse_trie_channel_wait_duration_histogram
295 .record(phase_end.duration_since(t));
296 t = phase_end;
297
298 let Ok(result) = message else {
299 unreachable!("we own the sender half")
300 };
301
302 let mut result = result.result?;
303 while let Ok(next) = self.proof_result_rx.try_recv() {
304 let res = next.result?;
305 result.extend(res);
306 }
307
308 let phase_end = Instant::now();
309 self.metrics
310 .sparse_trie_proof_coalesce_duration_histogram
311 .record(phase_end.duration_since(t));
312 t = phase_end;
313
314 self.on_proof_result(result)?;
315 self.metrics
316 .sparse_trie_reveal_multiproof_duration_histogram
317 .record(t.elapsed());
318 },
319 }
320
321 if self.updates.is_empty() && self.proof_result_rx.is_empty() {
322 self.dispatch_pending_targets();
325 t = Instant::now();
326 self.process_new_updates()?;
327 self.promote_pending_account_updates()?;
328 self.metrics.sparse_trie_process_updates_duration_histogram.record(t.elapsed());
329
330 if self.finished_state_updates &&
331 self.account_updates.is_empty() &&
332 self.storage_updates.iter().all(|(_, updates)| updates.is_empty())
333 {
334 break;
335 }
336
337 self.dispatch_pending_targets();
338
339 if self.proof_result_rx.is_empty() {
342 self.trie.calculate_subtries();
343 }
344 } else if self.updates.is_empty() || self.pending_updates > MAX_PENDING_UPDATES {
345 t = Instant::now();
348 self.process_new_updates()?;
349 self.metrics.sparse_trie_process_updates_duration_histogram.record(t.elapsed());
350 self.dispatch_pending_targets();
351 } else if self.pending_targets.len() > self.chunk_size {
352 self.dispatch_pending_targets();
354 }
355
356 idle_start = Instant::now();
357 }
358
359 self.metrics.sparse_trie_idle_time_seconds.record(total_idle_time.as_secs_f64());
360
361 debug!(target: "engine::root", "All proofs processed, ending calculation");
362
363 let start = Instant::now();
364 let (state_root, trie_updates) =
365 self.trie.root_with_updates(&self.proof_worker_handle).map_err(|e| {
366 ParallelStateRootError::Other(format!("could not calculate state root: {e:?}"))
367 })?;
368
369 #[cfg(feature = "trie-debug")]
370 let debug_recorders = self.trie.take_debug_recorders();
371
372 let end = Instant::now();
373 self.metrics.sparse_trie_final_update_duration_histogram.record(end.duration_since(start));
374 self.metrics.sparse_trie_total_duration_histogram.record(end.duration_since(now));
375
376 self.metrics.sparse_trie_account_cache_hits.record(self.account_cache_hits as f64);
377 self.metrics.sparse_trie_account_cache_misses.record(self.account_cache_misses as f64);
378 self.metrics.sparse_trie_storage_cache_hits.record(self.storage_cache_hits as f64);
379 self.metrics.sparse_trie_storage_cache_misses.record(self.storage_cache_misses as f64);
380 self.account_cache_hits = 0;
381 self.account_cache_misses = 0;
382 self.storage_cache_hits = 0;
383 self.storage_cache_misses = 0;
384
385 Ok(StateRootComputeOutcome {
386 state_root,
387 trie_updates: Arc::new(trie_updates),
388 #[cfg(feature = "trie-debug")]
389 debug_recorders,
390 })
391 }
392
393 fn on_message(&mut self, message: SparseTrieTaskMessage) {
395 match message {
396 SparseTrieTaskMessage::PrefetchProofs(targets) => self.on_prewarm_targets(targets),
397 SparseTrieTaskMessage::HashedState(hashed_state) => {
398 self.on_hashed_state_update(hashed_state)
399 }
400 SparseTrieTaskMessage::FinishedStateUpdates => self.finished_state_updates = true,
401 }
402 }
403
404 #[instrument(
405 level = "trace",
406 target = "engine::tree::payload_processor::sparse_trie",
407 skip_all
408 )]
409 fn on_prewarm_targets(&mut self, targets: MultiProofTargetsV2) {
410 for target in targets.account_targets {
411 self.new_account_updates.entry(target.key()).or_insert(LeafUpdate::Touched);
413 }
414
415 for (address, slots) in targets.storage_targets {
416 if !slots.is_empty() {
417 let new_updates = self.new_storage_updates.entry(address).or_default();
419 for slot in slots {
420 new_updates.entry(slot.key()).or_insert(LeafUpdate::Touched);
422 }
423 }
424
425 self.new_account_updates.entry(address).or_insert(LeafUpdate::Touched);
428 }
429 }
430
431 #[instrument(
433 level = "trace",
434 target = "engine::tree::payload_processor::sparse_trie",
435 skip_all
436 )]
437 fn on_hashed_state_update(&mut self, hashed_state_update: HashedPostState) {
438 for (address, storage) in hashed_state_update.storages {
439 if !storage.storage.is_empty() {
440 let new_updates = self.new_storage_updates.entry(address).or_default();
442 let mut existing_updates = self.storage_updates.get_mut(&address);
443
444 for (slot, value) in storage.storage {
445 self.trie.record_slot_touch(address, slot);
446
447 let encoded = if value.is_zero() {
448 Vec::new()
449 } else {
450 alloy_rlp::encode_fixed_size(&value).to_vec()
451 };
452 new_updates.insert(slot, LeafUpdate::Changed(encoded));
453
454 if let Some(ref mut existing) = existing_updates {
456 existing.remove(&slot);
457 }
458 }
459 }
460
461 self.new_account_updates.entry(address).or_insert(LeafUpdate::Touched);
464
465 self.pending_account_updates.entry(address).or_insert(None);
468 }
469
470 for (address, account) in hashed_state_update.accounts {
471 self.trie.record_account_touch(address);
472
473 self.new_account_updates.insert(address, LeafUpdate::Touched);
478
479 self.pending_account_updates.insert(address, Some(account));
482 }
483 }
484
485 fn on_proof_result(
486 &mut self,
487 result: DecodedMultiProofV2,
488 ) -> Result<(), ParallelStateRootError> {
489 self.trie.reveal_decoded_multiproof_v2(result).map_err(|e| {
490 ParallelStateRootError::Other(format!("could not reveal multiproof: {e:?}"))
491 })
492 }
493
494 fn process_new_updates(&mut self) -> SparseTrieResult<()> {
495 if self.pending_updates == 0 {
496 return Ok(());
497 }
498
499 let _span = debug_span!("process_new_updates").entered();
500 self.pending_updates = 0;
501
502 self.process_leaf_updates(true)?;
504
505 for (address, mut new) in self.new_storage_updates.drain() {
506 match self.storage_updates.entry(address) {
507 Entry::Vacant(entry) => {
508 entry.insert(new); }
510 Entry::Occupied(mut entry) => {
511 let updates = entry.get_mut();
512 for (slot, new) in new.drain() {
513 match updates.entry(slot) {
514 Entry::Occupied(mut slot_entry) => {
515 if new.is_changed() {
516 slot_entry.insert(new);
517 }
518 }
519 Entry::Vacant(slot_entry) => {
520 slot_entry.insert(new);
521 }
522 }
523 }
524 }
525 }
526 }
527
528 for (address, new) in self.new_account_updates.drain() {
529 match self.account_updates.entry(address) {
530 Entry::Occupied(mut entry) => {
531 if new.is_changed() {
532 entry.insert(new);
533 }
534 }
535 Entry::Vacant(entry) => {
536 entry.insert(new);
537 }
538 }
539 }
540
541 Ok(())
542 }
543
544 #[instrument(
547 level = "debug",
548 target = "engine::tree::payload_processor::sparse_trie",
549 skip_all
550 )]
551 fn process_leaf_updates(&mut self, new: bool) -> SparseTrieResult<()> {
552 let storage_updates =
553 if new { &mut self.new_storage_updates } else { &mut self.storage_updates };
554
555 let span = debug_span!("process_storage_leaf_updates").entered();
557 for (address, updates) in storage_updates {
558 if updates.is_empty() {
559 continue;
560 }
561 let _enter = trace_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage_trie_leaf_updates", a=%address).entered();
562
563 let trie = self.trie.get_or_create_storage_trie_mut(*address);
564 let fetched = self.fetched_storage_targets.entry(*address).or_default();
565 let mut targets = Vec::new();
566
567 let updates_len_before = updates.len();
568 trie.update_leaves(updates, |path, min_len| match fetched.entry(path) {
569 Entry::Occupied(mut entry) => {
570 if min_len < *entry.get() {
571 entry.insert(min_len);
572 targets.push(ProofV2Target::new(path).with_min_len(min_len));
573 }
574 }
575 Entry::Vacant(entry) => {
576 entry.insert(min_len);
577 targets.push(ProofV2Target::new(path).with_min_len(min_len));
578 }
579 })?;
580 let updates_len_after = updates.len();
581 self.storage_cache_hits += (updates_len_before - updates_len_after) as u64;
582 self.storage_cache_misses += updates_len_after as u64;
583
584 if !targets.is_empty() {
585 self.pending_targets.extend_storage_targets(address, targets);
586 }
587 }
588
589 drop(span);
590
591 self.process_account_leaf_updates(new)?;
593
594 Ok(())
595 }
596
597 #[instrument(
601 level = "debug",
602 target = "engine::tree::payload_processor::sparse_trie",
603 skip_all
604 )]
605 fn process_account_leaf_updates(&mut self, new: bool) -> SparseTrieResult<bool> {
606 let account_updates =
607 if new { &mut self.new_account_updates } else { &mut self.account_updates };
608
609 let updates_len_before = account_updates.len();
610
611 self.trie.trie_mut().update_leaves(account_updates, |target, min_len| {
612 match self.fetched_account_targets.entry(target) {
613 Entry::Occupied(mut entry) => {
614 if min_len < *entry.get() {
615 entry.insert(min_len);
616 self.pending_targets
617 .push_account_target(ProofV2Target::new(target).with_min_len(min_len));
618 }
619 }
620 Entry::Vacant(entry) => {
621 entry.insert(min_len);
622 self.pending_targets
623 .push_account_target(ProofV2Target::new(target).with_min_len(min_len));
624 }
625 }
626 })?;
627
628 let updates_len_after = account_updates.len();
629 self.account_cache_hits += (updates_len_before - updates_len_after) as u64;
630 self.account_cache_misses += updates_len_after as u64;
631
632 Ok(updates_len_after < updates_len_before)
633 }
634
635 #[instrument(
644 level = "debug",
645 target = "engine::tree::payload_processor::sparse_trie",
646 skip_all
647 )]
648 fn compute_drained_storage_roots(&mut self) {
649 let addresses_to_compute_roots: Vec<_> = self
650 .storage_updates
651 .iter()
652 .filter_map(|(address, updates)| updates.is_empty().then_some(*address))
653 .collect();
654
655 struct SendStorageTriePtr<S>(*mut RevealableSparseTrie<S>);
656 unsafe impl<S: Send> Send for SendStorageTriePtr<S> {}
659
660 let mut tries_to_compute_roots: Vec<(B256, SendStorageTriePtr<S>)> =
661 Vec::with_capacity(addresses_to_compute_roots.len());
662 for address in addresses_to_compute_roots {
663 if let Some(trie) = self.trie.storage_tries_mut().get_mut(&address) &&
664 !trie.is_root_cached()
665 {
666 tries_to_compute_roots.push((address, SendStorageTriePtr(trie)));
667 }
668 }
669
670 let parent_span = tracing::Span::current();
671 tries_to_compute_roots.into_par_iter().for_each(|(address, SendStorageTriePtr(trie))| {
672 let _enter = debug_span!(
673 target: "engine::tree::payload_processor::sparse_trie",
674 parent: &parent_span,
675 "storage_root",
676 ?address
677 )
678 .entered();
679 unsafe { (*trie).root().expect("updates are drained, trie should be revealed by now") };
686 });
687 }
688
689 #[instrument(
693 level = "debug",
694 target = "engine::tree::payload_processor::sparse_trie",
695 skip_all
696 )]
697 fn promote_pending_account_updates(&mut self) -> SparseTrieResult<()> {
698 self.process_leaf_updates(false)?;
699
700 if self.pending_account_updates.is_empty() {
701 return Ok(());
702 }
703
704 self.compute_drained_storage_roots();
705
706 loop {
707 let span = debug_span!("promote_updates", promoted = tracing::field::Empty).entered();
708 let account_rlp_buf = &mut self.account_rlp_buf;
710 let mut num_promoted = 0;
711 self.pending_account_updates.retain(|addr, account| {
712 if let Some(updates) = self.storage_updates.get(addr) {
713 if !updates.is_empty() {
714 return true;
716 } else if let Some(account) = account.take() {
717 let storage_root = self.trie.storage_root(addr).expect("updates are drained, storage trie should be revealed by now");
718 let encoded = encode_account_leaf_value(account, storage_root, account_rlp_buf);
719 self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
720 num_promoted += 1;
721 return false;
722 }
723 }
724
725 let trie_account = match self.account_updates.get(addr) {
727 Some(LeafUpdate::Changed(encoded)) => {
728 Some(encoded).filter(|encoded| !encoded.is_empty())
729 }
730 Some(LeafUpdate::Touched) => return true,
732 None => self.trie.get_account_value(addr),
733 };
734
735 let trie_account = trie_account.map(|value| TrieAccount::decode(&mut &value[..]).expect("invalid account RLP"));
736
737 let (account, storage_root) = if let Some(account) = account.take() {
738 let storage_root = trie_account.map(|account| account.storage_root).unwrap_or(EMPTY_ROOT_HASH);
743
744 (account, storage_root)
745 } else {
746 (trie_account.map(Into::into), self.trie.storage_root(addr).expect("account had storage updates that were applied to its trie, storage root must be revealed by now"))
747 };
748
749 let encoded = encode_account_leaf_value(account, storage_root, account_rlp_buf);
750 self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
751 num_promoted += 1;
752
753 false
754 });
755 span.record("promoted", num_promoted);
756 drop(span);
757
758 if num_promoted == 0 || !self.process_account_leaf_updates(false)? {
763 break
764 }
765 }
766
767 Ok(())
768 }
769
770 fn dispatch_pending_targets(&mut self) {
771 if self.pending_targets.is_empty() {
772 return;
773 }
774
775 let _span = debug_span!("dispatch_pending_targets").entered();
776 let (targets, chunking_length) = self.pending_targets.take();
777 dispatch_with_chunking(
778 targets,
779 chunking_length,
780 self.chunk_size,
781 self.max_targets_for_chunking,
782 self.proof_worker_handle.available_account_workers(),
783 self.proof_worker_handle.available_storage_workers(),
784 MultiProofTargetsV2::chunks,
785 |proof_targets| {
786 if let Err(e) =
787 self.proof_worker_handle.dispatch_account_multiproof(AccountMultiproofInput {
788 targets: proof_targets,
789 proof_result_sender: ProofResultContext::new(
790 self.proof_result_tx.clone(),
791 HashedPostState::default(),
792 Instant::now(),
793 ),
794 })
795 {
796 error!("failed to dispatch account multiproof: {e:?}");
797 }
798 },
799 );
800 }
801}
802
803fn encode_account_leaf_value(
805 account: Option<Account>,
806 storage_root: B256,
807 account_rlp_buf: &mut Vec<u8>,
808) -> Vec<u8> {
809 if account.is_none_or(|account| account.is_empty()) && storage_root == EMPTY_ROOT_HASH {
810 return Vec::new();
811 }
812
813 account_rlp_buf.clear();
814 account.unwrap_or_default().into_trie_account(storage_root).encode(account_rlp_buf);
815 account_rlp_buf.clone()
816}
817
818#[derive(Default)]
820struct PendingTargets {
821 targets: MultiProofTargetsV2,
823 len: usize,
825}
826
827impl PendingTargets {
828 const fn len(&self) -> usize {
830 self.len
831 }
832
833 const fn is_empty(&self) -> bool {
835 self.len == 0
836 }
837
838 fn take(&mut self) -> (MultiProofTargetsV2, usize) {
840 (std::mem::take(&mut self.targets), std::mem::take(&mut self.len))
841 }
842
843 fn push_account_target(&mut self, target: ProofV2Target) {
845 self.targets.account_targets.push(target);
846 self.len += 1;
847 }
848
849 fn extend_storage_targets(&mut self, address: &B256, targets: Vec<ProofV2Target>) {
851 self.len += targets.len();
852 self.targets.storage_targets.entry(*address).or_default().extend(targets);
853 }
854}
855
856enum SparseTrieTaskMessage {
858 HashedState(HashedPostState),
860 PrefetchProofs(MultiProofTargetsV2),
862 FinishedStateUpdates,
864}
865
866#[derive(Debug, Clone)]
869pub struct StateRootComputeOutcome {
870 pub state_root: B256,
872 pub trie_updates: Arc<TrieUpdates>,
874 #[cfg(feature = "trie-debug")]
877 pub debug_recorders: Vec<(Option<B256>, TrieDebugRecorder)>,
878}
879
880#[cfg(test)]
881mod tests {
882 use super::*;
883 use alloy_primitives::{keccak256, Address, B256, U256};
884 use reth_trie_sparse::ArenaParallelSparseTrie;
885
886 #[test]
887 fn test_run_hashing_task_hashed_state_update_forwards() {
888 let (updates_tx, updates_rx) = crossbeam_channel::unbounded();
889 let (hashed_state_tx, hashed_state_rx) = crossbeam_channel::unbounded();
890
891 let address = keccak256(Address::random());
892 let slot = keccak256(U256::from(42).to_be_bytes::<32>());
893 let value = U256::from(999);
894
895 let mut hashed_state = HashedPostState::default();
896 hashed_state.accounts.insert(
897 address,
898 Some(Account { balance: U256::from(100), nonce: 1, bytecode_hash: None }),
899 );
900 let mut storage = reth_trie::HashedStorage::new(false);
901 storage.storage.insert(slot, value);
902 hashed_state.storages.insert(address, storage);
903
904 let expected_state = hashed_state.clone();
905
906 let handle = std::thread::spawn(move || {
907 SparseTrieCacheTask::<ArenaParallelSparseTrie, ArenaParallelSparseTrie>::run_hashing_task(
908 updates_rx,
909 hashed_state_tx,
910 MultiProofTaskMetrics::default(),
911 );
912 });
913
914 updates_tx.send(MultiProofMessage::HashedStateUpdate(hashed_state)).unwrap();
915 updates_tx.send(MultiProofMessage::FinishedStateUpdates).unwrap();
916 drop(updates_tx);
917
918 let SparseTrieTaskMessage::HashedState(received) = hashed_state_rx.recv().unwrap() else {
919 panic!("expected HashedState message");
920 };
921
922 let account = received.accounts.get(&address).unwrap().unwrap();
923 assert_eq!(account.balance, expected_state.accounts[&address].unwrap().balance);
924 assert_eq!(account.nonce, expected_state.accounts[&address].unwrap().nonce);
925
926 let storage = received.storages.get(&address).unwrap();
927 assert_eq!(*storage.storage.get(&slot).unwrap(), value);
928
929 let second = hashed_state_rx.recv().unwrap();
930 assert!(matches!(second, SparseTrieTaskMessage::FinishedStateUpdates));
931
932 assert!(hashed_state_rx.recv().is_err());
933 handle.join().unwrap();
934 }
935
936 #[test]
937 fn test_encode_account_leaf_value_empty_account_and_empty_root_is_empty() {
938 let mut account_rlp_buf = vec![0xAB];
939 let encoded = encode_account_leaf_value(None, EMPTY_ROOT_HASH, &mut account_rlp_buf);
940
941 assert!(encoded.is_empty());
942 assert_eq!(account_rlp_buf, vec![0xAB]);
944 }
945
946 #[test]
947 fn test_encode_account_leaf_value_non_empty_account_is_rlp() {
948 let storage_root = B256::from([0x99; 32]);
949 let account = Some(Account {
950 nonce: 7,
951 balance: U256::from(42),
952 bytecode_hash: Some(B256::from([0xAA; 32])),
953 });
954 let mut account_rlp_buf = vec![0x00, 0x01];
955
956 let encoded = encode_account_leaf_value(account, storage_root, &mut account_rlp_buf);
957 let decoded = TrieAccount::decode(&mut &encoded[..]).expect("valid account RLP");
958
959 assert_eq!(decoded.nonce, 7);
960 assert_eq!(decoded.balance, U256::from(42));
961 assert_eq!(decoded.storage_root, storage_root);
962 assert_eq!(account_rlp_buf, encoded);
963 }
964}