1use std::sync::Arc;
4
5use crate::tree::{
6 multiproof::{
7 dispatch_with_chunking, evm_state_to_hashed_post_state, StateRootComputeOutcome,
8 StateRootMessage, DEFAULT_MAX_TARGETS_FOR_CHUNKING,
9 },
10 payload_processor::multiproof::MultiProofTaskMetrics,
11};
12use alloy_primitives::B256;
13use alloy_rlp::{Decodable, Encodable};
14use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
15use rayon::iter::{IntoParallelIterator, ParallelIterator};
16use reth_primitives_traits::{Account, FastInstant as Instant};
17use reth_tasks::Runtime;
18use reth_trie::{
19 updates::TrieUpdates, DecodedMultiProofV2, HashedPostState, TrieAccount, EMPTY_ROOT_HASH,
20 TRIE_ACCOUNT_RLP_MAX_SIZE,
21};
22use reth_trie_common::{MultiProofTargetsV2, ProofV2Target};
23use reth_trie_parallel::{
24 proof_task::{
25 AccountMultiproofInput, ProofResultContext, ProofResultMessage, ProofWorkerHandle,
26 },
27 root::ParallelStateRootError,
28};
29use reth_trie_sparse::{
30 errors::{SparseStateTrieErrorKind, SparseTrieErrorKind, SparseTrieResult},
31 ConfigurableSparseTrie, DeferredDrops, LeafUpdate, RevealableSparseTrie, SparseStateTrie,
32 SparseTrie,
33};
34use revm_primitives::{hash_map::Entry, B256Map};
35use tracing::{debug, debug_span, error, instrument, trace_span};
36
37const MAX_PENDING_UPDATES: usize = 100;
39
40pub(super) struct SparseTrieCacheTask<A = ConfigurableSparseTrie, S = ConfigurableSparseTrie> {
42 proof_result_tx: CrossbeamSender<ProofResultMessage>,
44 proof_result_rx: CrossbeamReceiver<ProofResultMessage>,
46 updates: CrossbeamReceiver<SparseTrieTaskMessage>,
48 trie: SparseStateTrie<A, S>,
50 parent_state_root: B256,
52 proof_worker_handle: ProofWorkerHandle,
54
55 chunk_size: usize,
58 max_targets_for_chunking: usize,
62
63 account_updates: B256Map<LeafUpdate>,
65 storage_updates: B256Map<B256Map<LeafUpdate>>,
67
68 new_account_updates: B256Map<LeafUpdate>,
70 new_storage_updates: B256Map<B256Map<LeafUpdate>>,
72 pending_account_updates: B256Map<Option<Option<Account>>>,
86 fetched_account_targets: B256Map<u8>,
89 fetched_storage_targets: B256Map<B256Map<u8>>,
92 account_rlp_buf: Vec<u8>,
94 finished_state_updates: bool,
96 account_cache_hits: u64,
98 account_cache_misses: u64,
100 storage_cache_hits: u64,
102 storage_cache_misses: u64,
104 pending_targets: PendingTargets,
106 pending_updates: usize,
109
110 metrics: MultiProofTaskMetrics,
112}
113
114impl<A, S> SparseTrieCacheTask<A, S>
115where
116 A: SparseTrie + Default,
117 S: SparseTrie + Default + Clone,
118{
119 pub(super) fn new_with_trie(
121 executor: &Runtime,
122 updates: CrossbeamReceiver<StateRootMessage>,
123 proof_worker_handle: ProofWorkerHandle,
124 metrics: MultiProofTaskMetrics,
125 trie: SparseStateTrie<A, S>,
126 parent_state_root: B256,
127 chunk_size: usize,
128 ) -> Self {
129 let (proof_result_tx, proof_result_rx) = crossbeam_channel::unbounded();
130 let (hashed_state_tx, hashed_state_rx) = crossbeam_channel::unbounded();
131
132 let parent_span = tracing::Span::current();
133 let hashing_metrics = metrics.clone();
134 executor.spawn_blocking_named("trie-hashing", move || {
135 let _span = trace_span!(parent: parent_span, "run_hashing_task").entered();
136 Self::run_hashing_task(updates, hashed_state_tx, hashing_metrics)
137 });
138
139 Self {
140 proof_result_tx,
141 proof_result_rx,
142 updates: hashed_state_rx,
143 proof_worker_handle,
144 trie,
145 parent_state_root,
146 chunk_size,
147 max_targets_for_chunking: DEFAULT_MAX_TARGETS_FOR_CHUNKING,
148 account_updates: Default::default(),
149 storage_updates: Default::default(),
150 new_account_updates: Default::default(),
151 new_storage_updates: Default::default(),
152 pending_account_updates: Default::default(),
153 fetched_account_targets: Default::default(),
154 fetched_storage_targets: Default::default(),
155 account_rlp_buf: Vec::with_capacity(TRIE_ACCOUNT_RLP_MAX_SIZE),
156 finished_state_updates: Default::default(),
157 account_cache_hits: 0,
158 account_cache_misses: 0,
159 storage_cache_hits: 0,
160 storage_cache_misses: 0,
161 pending_targets: Default::default(),
162 pending_updates: Default::default(),
163 metrics,
164 }
165 }
166
167 fn run_hashing_task(
170 updates: CrossbeamReceiver<StateRootMessage>,
171 hashed_state_tx: CrossbeamSender<SparseTrieTaskMessage>,
172 metrics: MultiProofTaskMetrics,
173 ) {
174 let mut total_idle_time = std::time::Duration::ZERO;
175 let mut idle_start = Instant::now();
176
177 while let Ok(message) = updates.recv() {
178 total_idle_time += idle_start.elapsed();
179
180 let msg = match message {
181 StateRootMessage::PrefetchProofs(targets) => {
182 SparseTrieTaskMessage::PrefetchProofs(targets)
183 }
184 StateRootMessage::StateUpdate(_, state) => {
185 let _span = trace_span!(target: "engine::tree::payload_processor::sparse_trie", "hashing_state_update", n = state.len()).entered();
186 let hashed = evm_state_to_hashed_post_state(state);
187 SparseTrieTaskMessage::HashedState(hashed)
188 }
189 StateRootMessage::FinishedStateUpdates => {
190 SparseTrieTaskMessage::FinishedStateUpdates
191 }
192 StateRootMessage::BlockAccessList(_) => {
193 idle_start = Instant::now();
194 continue;
195 }
196 StateRootMessage::HashedStateUpdate(state) => {
197 SparseTrieTaskMessage::HashedState(state)
198 }
199 };
200 if hashed_state_tx.send(msg).is_err() {
201 break;
202 }
203
204 idle_start = Instant::now();
205 }
206
207 metrics.hashing_task_idle_time_seconds.record(total_idle_time.as_secs_f64());
208 }
209
210 pub(super) fn into_trie_for_reuse(
218 self,
219 max_hot_slots: usize,
220 max_hot_accounts: usize,
221 max_nodes_capacity: usize,
222 max_values_capacity: usize,
223 disable_pruning: bool,
224 updates: &TrieUpdates,
225 ) -> (SparseStateTrie<A, S>, DeferredDrops) {
226 let Self { mut trie, .. } = self;
227 trie.commit_updates(updates);
228 if !disable_pruning {
229 trie.prune(max_hot_slots, max_hot_accounts);
230 trie.shrink_to(max_nodes_capacity, max_values_capacity);
231 }
232 let deferred = trie.take_deferred_drops();
233 (trie, deferred)
234 }
235
236 pub(super) fn into_cleared_trie(
241 self,
242 max_nodes_capacity: usize,
243 max_values_capacity: usize,
244 ) -> (SparseStateTrie<A, S>, DeferredDrops) {
245 let Self { mut trie, .. } = self;
246 trie.clear();
247 trie.shrink_to(max_nodes_capacity, max_values_capacity);
248 let deferred = trie.take_deferred_drops();
249 (trie, deferred)
250 }
251
252 #[instrument(
259 name = "SparseTrieCacheTask::run",
260 level = "debug",
261 target = "engine::tree::payload_processor::sparse_trie",
262 skip_all
263 )]
264 pub(super) fn run(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
265 let now = Instant::now();
266
267 let mut total_idle_time = std::time::Duration::ZERO;
268 let mut idle_start = Instant::now();
269
270 loop {
271 let mut t = Instant::now();
272 crossbeam_channel::select_biased! {
273 recv(self.updates) -> message => {
274 let wake = Instant::now();
275
276 let update = match message {
277 Ok(m) => m,
278 Err(_) => {
279 return Err(ParallelStateRootError::Other(
280 "updates channel disconnected before state root calculation".to_string(),
281 ))
282 }
283 };
284
285 total_idle_time += wake.duration_since(idle_start);
286 self.metrics
287 .sparse_trie_channel_wait_duration_histogram
288 .record(wake.duration_since(t));
289
290 self.on_message(update);
291 self.pending_updates += 1;
292 }
293 recv(self.proof_result_rx) -> message => {
294 let phase_end = Instant::now();
295 total_idle_time += phase_end.duration_since(idle_start);
296 self.metrics
297 .sparse_trie_channel_wait_duration_histogram
298 .record(phase_end.duration_since(t));
299 t = phase_end;
300
301 let Ok(result) = message else {
302 unreachable!("we own the sender half")
303 };
304
305 let mut result = result.result?;
306 while let Ok(next) = self.proof_result_rx.try_recv() {
307 let res = next.result?;
308 result.extend(res);
309 }
310
311 let phase_end = Instant::now();
312 self.metrics
313 .sparse_trie_proof_coalesce_duration_histogram
314 .record(phase_end.duration_since(t));
315 t = phase_end;
316
317 self.on_proof_result(result)?;
318 self.metrics
319 .sparse_trie_reveal_multiproof_duration_histogram
320 .record(t.elapsed());
321 },
322 }
323
324 if self.updates.is_empty() && self.proof_result_rx.is_empty() {
325 self.dispatch_pending_targets();
328 t = Instant::now();
329 self.process_new_updates()?;
330 self.promote_pending_account_updates()?;
331 self.metrics.sparse_trie_process_updates_duration_histogram.record(t.elapsed());
332
333 if self.finished_state_updates &&
334 self.account_updates.is_empty() &&
335 self.storage_updates.iter().all(|(_, updates)| updates.is_empty())
336 {
337 break;
338 }
339
340 self.dispatch_pending_targets();
341
342 if self.proof_result_rx.is_empty() {
345 self.trie.calculate_subtries();
346 }
347 } else if self.updates.is_empty() || self.pending_updates > MAX_PENDING_UPDATES {
348 t = Instant::now();
351 self.process_new_updates()?;
352 self.metrics.sparse_trie_process_updates_duration_histogram.record(t.elapsed());
353 self.dispatch_pending_targets();
354 } else if self.pending_targets.len() > self.chunk_size {
355 self.dispatch_pending_targets();
357 }
358
359 idle_start = Instant::now();
360 }
361
362 self.metrics.sparse_trie_idle_time_seconds.record(total_idle_time.as_secs_f64());
363
364 debug!(target: "engine::root", "All proofs processed, ending calculation");
365
366 let start = Instant::now();
367 let (state_root, trie_updates) = match self.trie.root_with_updates() {
368 Ok(result) => result,
369 Err(err)
370 if matches!(
371 err.kind(),
372 SparseStateTrieErrorKind::Sparse(SparseTrieErrorKind::Blind)
373 ) =>
374 {
375 (self.parent_state_root, TrieUpdates::default())
379 }
380 Err(err) => {
381 return Err(ParallelStateRootError::Other(format!(
382 "could not calculate state root: {err:?}"
383 )))
384 }
385 };
386
387 #[cfg(feature = "trie-debug")]
388 let debug_recorders = self.trie.take_debug_recorders();
389
390 let end = Instant::now();
391 self.metrics.sparse_trie_final_update_duration_histogram.record(end.duration_since(start));
392 self.metrics.sparse_trie_total_duration_histogram.record(end.duration_since(now));
393
394 self.metrics.sparse_trie_account_cache_hits.record(self.account_cache_hits as f64);
395 self.metrics.sparse_trie_account_cache_misses.record(self.account_cache_misses as f64);
396 self.metrics.sparse_trie_storage_cache_hits.record(self.storage_cache_hits as f64);
397 self.metrics.sparse_trie_storage_cache_misses.record(self.storage_cache_misses as f64);
398 self.account_cache_hits = 0;
399 self.account_cache_misses = 0;
400 self.storage_cache_hits = 0;
401 self.storage_cache_misses = 0;
402
403 Ok(StateRootComputeOutcome {
404 state_root,
405 trie_updates: Arc::new(trie_updates),
406 #[cfg(feature = "trie-debug")]
407 debug_recorders,
408 })
409 }
410
411 fn on_message(&mut self, message: SparseTrieTaskMessage) {
413 match message {
414 SparseTrieTaskMessage::PrefetchProofs(targets) => self.on_prewarm_targets(targets),
415 SparseTrieTaskMessage::HashedState(hashed_state) => {
416 self.on_hashed_state_update(hashed_state)
417 }
418 SparseTrieTaskMessage::FinishedStateUpdates => self.finished_state_updates = true,
419 }
420 }
421
422 #[instrument(
423 level = "trace",
424 target = "engine::tree::payload_processor::sparse_trie",
425 skip_all
426 )]
427 fn on_prewarm_targets(&mut self, targets: MultiProofTargetsV2) {
428 for target in targets.account_targets {
429 self.new_account_updates.entry(target.key()).or_insert(LeafUpdate::Touched);
431 }
432
433 for (address, slots) in targets.storage_targets {
434 if !slots.is_empty() {
435 let new_updates = self.new_storage_updates.entry(address).or_default();
437 for slot in slots {
438 new_updates.entry(slot.key()).or_insert(LeafUpdate::Touched);
440 }
441 }
442
443 self.new_account_updates.entry(address).or_insert(LeafUpdate::Touched);
446 }
447 }
448
449 #[instrument(
451 level = "trace",
452 target = "engine::tree::payload_processor::sparse_trie",
453 skip_all
454 )]
455 fn on_hashed_state_update(&mut self, hashed_state_update: HashedPostState) {
456 for (address, storage) in hashed_state_update.storages {
457 if !storage.storage.is_empty() {
458 let new_updates = self.new_storage_updates.entry(address).or_default();
460 let mut existing_updates = self.storage_updates.get_mut(&address);
461
462 for (slot, value) in storage.storage {
463 self.trie.record_slot_touch(address, slot);
464
465 let encoded = if value.is_zero() {
466 Vec::new()
467 } else {
468 alloy_rlp::encode_fixed_size(&value).to_vec()
469 };
470 new_updates.insert(slot, LeafUpdate::Changed(encoded));
471
472 if let Some(ref mut existing) = existing_updates {
474 existing.remove(&slot);
475 }
476 }
477 }
478
479 self.new_account_updates.entry(address).or_insert(LeafUpdate::Touched);
482
483 self.pending_account_updates.entry(address).or_insert(None);
486 }
487
488 for (address, account) in hashed_state_update.accounts {
489 self.trie.record_account_touch(address);
490
491 self.new_account_updates.insert(address, LeafUpdate::Touched);
496
497 self.pending_account_updates.insert(address, Some(account));
500 }
501 }
502
503 fn on_proof_result(
504 &mut self,
505 result: DecodedMultiProofV2,
506 ) -> Result<(), ParallelStateRootError> {
507 self.trie.reveal_decoded_multiproof_v2(result).map_err(|e| {
508 ParallelStateRootError::Other(format!("could not reveal multiproof: {e:?}"))
509 })
510 }
511
512 fn process_new_updates(&mut self) -> SparseTrieResult<()> {
513 if self.pending_updates == 0 {
514 return Ok(());
515 }
516
517 let _span = debug_span!("process_new_updates").entered();
518 self.pending_updates = 0;
519
520 self.process_leaf_updates(true)?;
522
523 for (address, mut new) in self.new_storage_updates.drain() {
524 match self.storage_updates.entry(address) {
525 Entry::Vacant(entry) => {
526 entry.insert(new); }
528 Entry::Occupied(mut entry) => {
529 let updates = entry.get_mut();
530 for (slot, new) in new.drain() {
531 match updates.entry(slot) {
532 Entry::Occupied(mut slot_entry) => {
533 if new.is_changed() {
534 slot_entry.insert(new);
535 }
536 }
537 Entry::Vacant(slot_entry) => {
538 slot_entry.insert(new);
539 }
540 }
541 }
542 }
543 }
544 }
545
546 for (address, new) in self.new_account_updates.drain() {
547 match self.account_updates.entry(address) {
548 Entry::Occupied(mut entry) => {
549 if new.is_changed() {
550 entry.insert(new);
551 }
552 }
553 Entry::Vacant(entry) => {
554 entry.insert(new);
555 }
556 }
557 }
558
559 Ok(())
560 }
561
562 #[instrument(
565 level = "trace",
566 target = "engine::tree::payload_processor::sparse_trie",
567 skip_all
568 )]
569 fn process_leaf_updates(&mut self, new: bool) -> SparseTrieResult<()> {
570 let storage_updates =
571 if new { &mut self.new_storage_updates } else { &mut self.storage_updates };
572
573 let span = trace_span!("process_storage_leaf_updates").entered();
575 for (address, updates) in storage_updates {
576 if updates.is_empty() {
577 continue;
578 }
579 let _enter = trace_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage_trie_leaf_updates", a=%address).entered();
580
581 let trie = self.trie.get_or_create_storage_trie_mut(*address);
582 let fetched = self.fetched_storage_targets.entry(*address).or_default();
583 let mut targets = Vec::new();
584
585 let updates_len_before = updates.len();
586 trie.update_leaves(updates, |path, min_len| match fetched.entry(path) {
587 Entry::Occupied(mut entry) => {
588 if min_len < *entry.get() {
589 entry.insert(min_len);
590 targets.push(ProofV2Target::new(path).with_min_len(min_len));
591 }
592 }
593 Entry::Vacant(entry) => {
594 entry.insert(min_len);
595 targets.push(ProofV2Target::new(path).with_min_len(min_len));
596 }
597 })?;
598 let updates_len_after = updates.len();
599 self.storage_cache_hits += (updates_len_before - updates_len_after) as u64;
600 self.storage_cache_misses += updates_len_after as u64;
601
602 if !targets.is_empty() {
603 self.pending_targets.extend_storage_targets(address, targets);
604 }
605 }
606
607 drop(span);
608
609 self.process_account_leaf_updates(new)?;
611
612 Ok(())
613 }
614
615 #[instrument(
619 level = "trace",
620 target = "engine::tree::payload_processor::sparse_trie",
621 skip_all
622 )]
623 fn process_account_leaf_updates(&mut self, new: bool) -> SparseTrieResult<bool> {
624 let account_updates =
625 if new { &mut self.new_account_updates } else { &mut self.account_updates };
626
627 let updates_len_before = account_updates.len();
628
629 self.trie.trie_mut().update_leaves(account_updates, |target, min_len| {
630 match self.fetched_account_targets.entry(target) {
631 Entry::Occupied(mut entry) => {
632 if min_len < *entry.get() {
633 entry.insert(min_len);
634 self.pending_targets
635 .push_account_target(ProofV2Target::new(target).with_min_len(min_len));
636 }
637 }
638 Entry::Vacant(entry) => {
639 entry.insert(min_len);
640 self.pending_targets
641 .push_account_target(ProofV2Target::new(target).with_min_len(min_len));
642 }
643 }
644 })?;
645
646 let updates_len_after = account_updates.len();
647 self.account_cache_hits += (updates_len_before - updates_len_after) as u64;
648 self.account_cache_misses += updates_len_after as u64;
649
650 Ok(updates_len_after < updates_len_before)
651 }
652
653 fn compute_drained_storage_roots(&mut self) {
662 let addresses_to_compute_roots: Vec<_> = self
663 .storage_updates
664 .iter()
665 .filter_map(|(address, updates)| updates.is_empty().then_some(*address))
666 .collect();
667
668 struct SendStorageTriePtr<S>(*mut RevealableSparseTrie<S>);
669 unsafe impl<S: Send> Send for SendStorageTriePtr<S> {}
672
673 let mut tries_to_compute_roots: Vec<(B256, SendStorageTriePtr<S>)> =
674 Vec::with_capacity(addresses_to_compute_roots.len());
675 for address in addresses_to_compute_roots {
676 if let Some(trie) = self.trie.storage_tries_mut().get_mut(&address) &&
677 !trie.is_root_cached()
678 {
679 tries_to_compute_roots.push((address, SendStorageTriePtr(trie)));
680 }
681 }
682
683 if tries_to_compute_roots.is_empty() {
684 return;
685 }
686
687 let parent_span =
688 debug_span!("compute_drained_storage_roots", n = tries_to_compute_roots.len());
689 tries_to_compute_roots.into_par_iter().for_each(|(address, SendStorageTriePtr(trie))| {
690 let span = if tracing::enabled!(tracing::Level::TRACE) {
691 debug_span!(
692 target: "engine::tree::payload_processor::sparse_trie",
693 parent: &parent_span,
694 "storage_root",
695 ?address
696 )
697 } else {
698 debug_span!(
699 target: "engine::tree::payload_processor::sparse_trie",
700 parent: &parent_span,
701 "storage_root",
702 )
703 };
704 let _enter = span.entered();
705 unsafe { (*trie).root().expect("updates are drained, trie should be revealed by now") };
712 });
713 }
714
715 #[instrument(
719 level = "trace",
720 target = "engine::tree::payload_processor::sparse_trie",
721 skip_all
722 )]
723 fn promote_pending_account_updates(&mut self) -> SparseTrieResult<()> {
724 self.process_leaf_updates(false)?;
725
726 if self.pending_account_updates.is_empty() {
727 return Ok(());
728 }
729
730 self.compute_drained_storage_roots();
731
732 loop {
733 let span = trace_span!("promote_updates", promoted = tracing::field::Empty).entered();
734 let account_rlp_buf = &mut self.account_rlp_buf;
736 let mut num_promoted = 0;
737 self.pending_account_updates.retain(|addr, account| {
738 if let Some(updates) = self.storage_updates.get(addr) {
739 if !updates.is_empty() {
740 return true;
742 } else if let Some(account) = account.take() {
743 let storage_root = self.trie.storage_root(addr).expect("updates are drained, storage trie should be revealed by now");
744 let encoded = encode_account_leaf_value(account, storage_root, account_rlp_buf);
745 self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
746 num_promoted += 1;
747 return false;
748 }
749 }
750
751 let trie_account = match self.account_updates.get(addr) {
753 Some(LeafUpdate::Changed(encoded)) => {
754 Some(encoded).filter(|encoded| !encoded.is_empty())
755 }
756 Some(LeafUpdate::Touched) => return true,
758 None => self.trie.get_account_value(addr),
759 };
760
761 let trie_account = trie_account.map(|value| TrieAccount::decode(&mut &value[..]).expect("invalid account RLP"));
762
763 let (account, storage_root) = if let Some(account) = account.take() {
764 let storage_root = trie_account.map(|account| account.storage_root).unwrap_or(EMPTY_ROOT_HASH);
769
770 (account, storage_root)
771 } else {
772 (trie_account.map(Into::into), self.trie.storage_root(addr).expect("account had storage updates that were applied to its trie, storage root must be revealed by now"))
773 };
774
775 let encoded = encode_account_leaf_value(account, storage_root, account_rlp_buf);
776 self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
777 num_promoted += 1;
778
779 false
780 });
781 span.record("promoted", num_promoted);
782 drop(span);
783
784 if num_promoted == 0 || !self.process_account_leaf_updates(false)? {
789 break
790 }
791 }
792
793 Ok(())
794 }
795
796 fn dispatch_pending_targets(&mut self) {
797 if self.pending_targets.is_empty() {
798 return;
799 }
800
801 let _span = trace_span!("dispatch_pending_targets").entered();
802 let (targets, chunking_length) = self.pending_targets.take();
803 dispatch_with_chunking(
804 targets,
805 chunking_length,
806 self.chunk_size,
807 self.max_targets_for_chunking,
808 self.proof_worker_handle.has_multiple_idle_account_workers(),
809 self.proof_worker_handle.has_multiple_idle_storage_workers(),
810 MultiProofTargetsV2::chunks,
811 |proof_targets| {
812 if let Err(e) =
813 self.proof_worker_handle.dispatch_account_multiproof(AccountMultiproofInput {
814 targets: proof_targets,
815 proof_result_sender: ProofResultContext::new(
816 self.proof_result_tx.clone(),
817 HashedPostState::default(),
818 Instant::now(),
819 ),
820 })
821 {
822 error!("failed to dispatch account multiproof: {e:?}");
823 }
824 },
825 );
826 }
827}
828
829fn encode_account_leaf_value(
831 account: Option<Account>,
832 storage_root: B256,
833 account_rlp_buf: &mut Vec<u8>,
834) -> Vec<u8> {
835 if account.is_none_or(|account| account.is_empty()) && storage_root == EMPTY_ROOT_HASH {
836 return Vec::new();
837 }
838
839 account_rlp_buf.clear();
840 account.unwrap_or_default().into_trie_account(storage_root).encode(account_rlp_buf);
841 account_rlp_buf.clone()
842}
843
844#[derive(Default)]
846struct PendingTargets {
847 targets: MultiProofTargetsV2,
849 len: usize,
851}
852
853impl PendingTargets {
854 const fn len(&self) -> usize {
856 self.len
857 }
858
859 const fn is_empty(&self) -> bool {
861 self.len == 0
862 }
863
864 fn take(&mut self) -> (MultiProofTargetsV2, usize) {
866 (std::mem::take(&mut self.targets), std::mem::take(&mut self.len))
867 }
868
869 fn push_account_target(&mut self, target: ProofV2Target) {
871 self.targets.account_targets.push(target);
872 self.len += 1;
873 }
874
875 fn extend_storage_targets(&mut self, address: &B256, targets: Vec<ProofV2Target>) {
877 self.len += targets.len();
878 self.targets.storage_targets.entry(*address).or_default().extend(targets);
879 }
880}
881
882enum SparseTrieTaskMessage {
884 HashedState(HashedPostState),
886 PrefetchProofs(MultiProofTargetsV2),
888 FinishedStateUpdates,
890}
891
892#[cfg(test)]
893mod tests {
894 use super::*;
895 use alloy_primitives::{keccak256, Address, B256, U256};
896 use reth_provider::{
897 providers::{OverlayBuilder, OverlayStateProviderFactory},
898 test_utils::create_test_provider_factory,
899 ChainSpecProvider,
900 };
901 use reth_trie_db::ChangesetCache;
902 use reth_trie_parallel::proof_task::ProofTaskCtx;
903 use reth_trie_sparse::ArenaParallelSparseTrie;
904
905 #[test]
906 fn test_run_hashing_task_hashed_state_update_forwards() {
907 let (updates_tx, updates_rx) = crossbeam_channel::unbounded();
908 let (hashed_state_tx, hashed_state_rx) = crossbeam_channel::unbounded();
909
910 let address = keccak256(Address::random());
911 let slot = keccak256(U256::from(42).to_be_bytes::<32>());
912 let value = U256::from(999);
913
914 let mut hashed_state = HashedPostState::default();
915 hashed_state.accounts.insert(
916 address,
917 Some(Account { balance: U256::from(100), nonce: 1, bytecode_hash: None }),
918 );
919 let mut storage = reth_trie::HashedStorage::new(false);
920 storage.storage.insert(slot, value);
921 hashed_state.storages.insert(address, storage);
922
923 let expected_state = hashed_state.clone();
924
925 let handle = std::thread::spawn(move || {
926 SparseTrieCacheTask::<ArenaParallelSparseTrie, ArenaParallelSparseTrie>::run_hashing_task(
927 updates_rx,
928 hashed_state_tx,
929 MultiProofTaskMetrics::default(),
930 );
931 });
932
933 updates_tx.send(StateRootMessage::HashedStateUpdate(hashed_state)).unwrap();
934 updates_tx.send(StateRootMessage::FinishedStateUpdates).unwrap();
935 drop(updates_tx);
936
937 let SparseTrieTaskMessage::HashedState(received) = hashed_state_rx.recv().unwrap() else {
938 panic!("expected HashedState message");
939 };
940
941 let account = received.accounts.get(&address).unwrap().unwrap();
942 assert_eq!(account.balance, expected_state.accounts[&address].unwrap().balance);
943 assert_eq!(account.nonce, expected_state.accounts[&address].unwrap().nonce);
944
945 let storage = received.storages.get(&address).unwrap();
946 assert_eq!(*storage.storage.get(&slot).unwrap(), value);
947
948 let second = hashed_state_rx.recv().unwrap();
949 assert!(matches!(second, SparseTrieTaskMessage::FinishedStateUpdates));
950
951 assert!(hashed_state_rx.recv().is_err());
952 handle.join().unwrap();
953 }
954
955 #[test]
956 fn test_encode_account_leaf_value_empty_account_and_empty_root_is_empty() {
957 let mut account_rlp_buf = vec![0xAB];
958 let encoded = encode_account_leaf_value(None, EMPTY_ROOT_HASH, &mut account_rlp_buf);
959
960 assert!(encoded.is_empty());
961 assert_eq!(account_rlp_buf, vec![0xAB]);
963 }
964
965 #[test]
966 fn test_encode_account_leaf_value_non_empty_account_is_rlp() {
967 let storage_root = B256::from([0x99; 32]);
968 let account = Some(Account {
969 nonce: 7,
970 balance: U256::from(42),
971 bytecode_hash: Some(B256::from([0xAA; 32])),
972 });
973 let mut account_rlp_buf = vec![0x00, 0x01];
974
975 let encoded = encode_account_leaf_value(account, storage_root, &mut account_rlp_buf);
976 let decoded = TrieAccount::decode(&mut &encoded[..]).expect("valid account RLP");
977
978 assert_eq!(decoded.nonce, 7);
979 assert_eq!(decoded.balance, U256::from(42));
980 assert_eq!(decoded.storage_root, storage_root);
981 assert_eq!(account_rlp_buf, encoded);
982 }
983
984 #[test]
985 fn run_returns_parent_root_without_revealing_blind_trie_when_no_state_updates() {
986 let runtime = reth_tasks::Runtime::test();
987 let provider_factory = create_test_provider_factory();
988 let anchor_hash = provider_factory.chain_spec().genesis_hash();
989 let overlay_factory = OverlayStateProviderFactory::new(
990 provider_factory,
991 OverlayBuilder::<reth_chain_state::EthPrimitives>::new(
992 anchor_hash,
993 ChangesetCache::new(),
994 ),
995 );
996 let proof_worker_handle =
997 ProofWorkerHandle::new(&runtime, ProofTaskCtx::new(overlay_factory), false);
998
999 let default_trie = RevealableSparseTrie::blind_from(ConfigurableSparseTrie::Arena(
1000 ArenaParallelSparseTrie::default(),
1001 ));
1002 let trie = SparseStateTrie::default()
1003 .with_accounts_trie(default_trie.clone())
1004 .with_default_storage_trie(default_trie)
1005 .with_updates(true);
1006
1007 let parent_state_root = B256::from([0x55; 32]);
1008 let (updates_tx, updates_rx) = crossbeam_channel::unbounded();
1009 let mut task = SparseTrieCacheTask::new_with_trie(
1010 &runtime,
1011 updates_rx,
1012 proof_worker_handle,
1013 MultiProofTaskMetrics::default(),
1014 trie,
1015 parent_state_root,
1016 1,
1017 );
1018
1019 updates_tx.send(StateRootMessage::FinishedStateUpdates).unwrap();
1020 drop(updates_tx);
1021
1022 let outcome = task.run().expect("state root computation should succeed");
1023
1024 assert_eq!(outcome.state_root, parent_state_root);
1025 assert!(outcome.trie_updates.is_empty());
1026 assert!(task.trie.state_trie_ref().is_none(), "blind trie should not be revealed");
1027 }
1028}