1use std::sync::Arc;
4
5use crate::tree::{
6 multiproof::{
7 dispatch_with_chunking, evm_state_to_hashed_post_state, StateRootComputeOutcome,
8 StateRootMessage, DEFAULT_MAX_TARGETS_FOR_CHUNKING,
9 },
10 payload_processor::multiproof::MultiProofTaskMetrics,
11};
12use alloy_primitives::B256;
13use alloy_rlp::{Decodable, Encodable};
14use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
15use rayon::iter::{IntoParallelIterator, ParallelIterator};
16use reth_primitives_traits::{Account, FastInstant as Instant};
17use reth_tasks::Runtime;
18use reth_trie::{
19 updates::TrieUpdates, DecodedMultiProofV2, HashedPostState, TrieAccount, EMPTY_ROOT_HASH,
20 TRIE_ACCOUNT_RLP_MAX_SIZE,
21};
22use reth_trie_common::{MultiProofTargetsV2, ProofV2Target};
23use reth_trie_parallel::{
24 proof_task::{
25 AccountMultiproofInput, ProofResultContext, ProofResultMessage, ProofWorkerHandle,
26 },
27 root::ParallelStateRootError,
28};
29use reth_trie_sparse::{
30 errors::{SparseStateTrieErrorKind, SparseTrieErrorKind, SparseTrieResult},
31 ConfigurableSparseTrie, DeferredDrops, LeafUpdate, RevealableSparseTrie, SparseStateTrie,
32 SparseTrie,
33};
34use revm_primitives::{hash_map::Entry, B256Map};
35use tracing::{debug, debug_span, error, instrument, trace_span};
36
37const MAX_PENDING_UPDATES: usize = 100;
39
40pub(super) struct SparseTrieCacheTask<A = ConfigurableSparseTrie, S = ConfigurableSparseTrie> {
42 proof_result_tx: CrossbeamSender<ProofResultMessage>,
44 proof_result_rx: CrossbeamReceiver<ProofResultMessage>,
46 updates: CrossbeamReceiver<SparseTrieTaskMessage>,
48 final_hashed_state_tx: Option<std::sync::mpsc::Sender<HashedPostState>>,
50 trie: SparseStateTrie<A, S>,
52 parent_state_root: B256,
54 proof_worker_handle: ProofWorkerHandle,
56
57 chunk_size: usize,
60 max_targets_for_chunking: usize,
64
65 account_updates: B256Map<LeafUpdate>,
67 storage_updates: B256Map<B256Map<LeafUpdate>>,
69
70 new_account_updates: B256Map<LeafUpdate>,
72 new_storage_updates: B256Map<B256Map<LeafUpdate>>,
74 pending_account_updates: B256Map<Option<Option<Account>>>,
88 fetched_account_targets: B256Map<u8>,
91 fetched_storage_targets: B256Map<B256Map<u8>>,
94 account_rlp_buf: Vec<u8>,
96 finished_state_updates: bool,
98 account_cache_hits: u64,
100 account_cache_misses: u64,
102 storage_cache_hits: u64,
104 storage_cache_misses: u64,
106 pending_targets: PendingTargets,
108 pending_updates: usize,
111 final_hashed_state: HashedPostState,
117
118 metrics: MultiProofTaskMetrics,
120}
121
122impl<A, S> SparseTrieCacheTask<A, S>
123where
124 A: SparseTrie + Default,
125 S: SparseTrie + Default + Clone,
126{
127 #[expect(clippy::too_many_arguments)]
129 pub(super) fn new_with_trie(
130 executor: &Runtime,
131 updates: CrossbeamReceiver<StateRootMessage>,
132 final_hashed_state_tx: std::sync::mpsc::Sender<HashedPostState>,
133 proof_worker_handle: ProofWorkerHandle,
134 metrics: MultiProofTaskMetrics,
135 trie: SparseStateTrie<A, S>,
136 parent_state_root: B256,
137 chunk_size: usize,
138 ) -> Self {
139 let (proof_result_tx, proof_result_rx) = crossbeam_channel::unbounded();
140 let (hashed_state_tx, hashed_state_rx) = crossbeam_channel::unbounded();
141
142 let parent_span = tracing::Span::current();
143 let hashing_metrics = metrics.clone();
144 executor.spawn_blocking_named("trie-hashing", move || {
145 let _span = trace_span!(parent: parent_span, "run_hashing_task").entered();
146 Self::run_hashing_task(updates, hashed_state_tx, hashing_metrics)
147 });
148
149 Self {
150 proof_result_tx,
151 proof_result_rx,
152 updates: hashed_state_rx,
153 proof_worker_handle,
154 final_hashed_state_tx: Some(final_hashed_state_tx),
155 trie,
156 parent_state_root,
157 chunk_size,
158 max_targets_for_chunking: DEFAULT_MAX_TARGETS_FOR_CHUNKING,
159 account_updates: Default::default(),
160 storage_updates: Default::default(),
161 new_account_updates: Default::default(),
162 new_storage_updates: Default::default(),
163 pending_account_updates: Default::default(),
164 fetched_account_targets: Default::default(),
165 fetched_storage_targets: Default::default(),
166 account_rlp_buf: Vec::with_capacity(TRIE_ACCOUNT_RLP_MAX_SIZE),
167 finished_state_updates: Default::default(),
168 account_cache_hits: 0,
169 account_cache_misses: 0,
170 storage_cache_hits: 0,
171 storage_cache_misses: 0,
172 pending_targets: Default::default(),
173 pending_updates: Default::default(),
174 final_hashed_state: Default::default(),
175 metrics,
176 }
177 }
178
179 fn run_hashing_task(
182 updates: CrossbeamReceiver<StateRootMessage>,
183 hashed_state_tx: CrossbeamSender<SparseTrieTaskMessage>,
184 metrics: MultiProofTaskMetrics,
185 ) {
186 let mut total_idle_time = std::time::Duration::ZERO;
187 let mut idle_start = Instant::now();
188
189 while let Ok(message) = updates.recv() {
190 total_idle_time += idle_start.elapsed();
191
192 let msg = match message {
193 StateRootMessage::PrefetchProofs(targets) => {
194 SparseTrieTaskMessage::PrefetchProofs(targets)
195 }
196 StateRootMessage::StateUpdate(state) => {
197 let _span = trace_span!(target: "engine::tree::payload_processor::sparse_trie", "hashing_state_update", n = state.len()).entered();
198 let hashed = evm_state_to_hashed_post_state(state);
199 SparseTrieTaskMessage::HashedState(hashed)
200 }
201 StateRootMessage::FinishedStateUpdates => {
202 SparseTrieTaskMessage::FinishedStateUpdates
203 }
204 StateRootMessage::BlockAccessList(_) => {
205 idle_start = Instant::now();
206 continue;
207 }
208 StateRootMessage::HashedStateUpdate(state) => {
209 SparseTrieTaskMessage::HashedState(state)
210 }
211 };
212 if hashed_state_tx.send(msg).is_err() {
213 break;
214 }
215
216 idle_start = Instant::now();
217 }
218
219 metrics.hashing_task_idle_time_seconds.record(total_idle_time.as_secs_f64());
220 }
221
222 pub(super) fn into_trie_for_reuse(
230 self,
231 max_hot_slots: usize,
232 max_hot_accounts: usize,
233 max_nodes_capacity: usize,
234 max_values_capacity: usize,
235 disable_pruning: bool,
236 updates: &TrieUpdates,
237 ) -> (SparseStateTrie<A, S>, DeferredDrops) {
238 let Self { mut trie, .. } = self;
239 trie.commit_updates(updates);
240 if !disable_pruning {
241 trie.prune(max_hot_slots, max_hot_accounts);
242 trie.shrink_to(max_nodes_capacity, max_values_capacity);
243 }
244 let deferred = trie.take_deferred_drops();
245 (trie, deferred)
246 }
247
248 pub(super) fn into_cleared_trie(
253 self,
254 max_nodes_capacity: usize,
255 max_values_capacity: usize,
256 ) -> (SparseStateTrie<A, S>, DeferredDrops) {
257 let Self { mut trie, .. } = self;
258 trie.clear();
259 trie.shrink_to(max_nodes_capacity, max_values_capacity);
260 let deferred = trie.take_deferred_drops();
261 (trie, deferred)
262 }
263
264 #[instrument(
271 name = "SparseTrieCacheTask::run",
272 level = "debug",
273 target = "engine::tree::payload_processor::sparse_trie",
274 skip_all
275 )]
276 pub(super) fn run(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
277 let now = Instant::now();
278
279 let mut total_idle_time = std::time::Duration::ZERO;
280 let mut idle_start = Instant::now();
281
282 loop {
283 let mut t = Instant::now();
284 crossbeam_channel::select_biased! {
285 recv(self.updates) -> message => {
286 let wake = Instant::now();
287
288 let update = match message {
289 Ok(m) => m,
290 Err(_) => {
291 return Err(ParallelStateRootError::Other(
292 "updates channel disconnected before state root calculation".to_string(),
293 ))
294 }
295 };
296
297 total_idle_time += wake.duration_since(idle_start);
298 self.metrics
299 .sparse_trie_channel_wait_duration_histogram
300 .record(wake.duration_since(t));
301
302 self.on_message(update);
303 self.pending_updates += 1;
304 }
305 recv(self.proof_result_rx) -> message => {
306 let phase_end = Instant::now();
307 total_idle_time += phase_end.duration_since(idle_start);
308 self.metrics
309 .sparse_trie_channel_wait_duration_histogram
310 .record(phase_end.duration_since(t));
311 t = phase_end;
312
313 let Ok(result) = message else {
314 unreachable!("we own the sender half")
315 };
316
317 let mut result = result.result?;
318 while let Ok(next) = self.proof_result_rx.try_recv() {
319 let res = next.result?;
320 result.extend(res);
321 }
322
323 let phase_end = Instant::now();
324 self.metrics
325 .sparse_trie_proof_coalesce_duration_histogram
326 .record(phase_end.duration_since(t));
327 t = phase_end;
328
329 self.on_proof_result(result)?;
330 self.metrics
331 .sparse_trie_reveal_multiproof_duration_histogram
332 .record(t.elapsed());
333 },
334 }
335
336 if self.updates.is_empty() && self.proof_result_rx.is_empty() {
337 self.dispatch_pending_targets();
340 t = Instant::now();
341 self.process_new_updates()?;
342 self.promote_pending_account_updates()?;
343 self.metrics.sparse_trie_process_updates_duration_histogram.record(t.elapsed());
344
345 if self.finished_state_updates &&
346 self.account_updates.is_empty() &&
347 self.storage_updates.iter().all(|(_, updates)| updates.is_empty())
348 {
349 break;
350 }
351
352 self.dispatch_pending_targets();
353
354 if self.proof_result_rx.is_empty() {
357 self.trie.calculate_subtries();
358 }
359 } else if self.updates.is_empty() || self.pending_updates > MAX_PENDING_UPDATES {
360 t = Instant::now();
363 self.process_new_updates()?;
364 self.metrics.sparse_trie_process_updates_duration_histogram.record(t.elapsed());
365 self.dispatch_pending_targets();
366 } else if self.pending_targets.len() > self.chunk_size {
367 self.dispatch_pending_targets();
369 }
370
371 idle_start = Instant::now();
372 }
373
374 self.metrics.sparse_trie_idle_time_seconds.record(total_idle_time.as_secs_f64());
375
376 debug!(target: "engine::root", "All proofs processed, ending calculation");
377
378 let start = Instant::now();
379 let (state_root, trie_updates) = match self.trie.root_with_updates() {
380 Ok(result) => result,
381 Err(err)
382 if matches!(
383 err.kind(),
384 SparseStateTrieErrorKind::Sparse(SparseTrieErrorKind::Blind)
385 ) =>
386 {
387 (self.parent_state_root, TrieUpdates::default())
391 }
392 Err(err) => {
393 return Err(ParallelStateRootError::Other(format!(
394 "could not calculate state root: {err:?}"
395 )))
396 }
397 };
398
399 #[cfg(feature = "trie-debug")]
400 let debug_recorders = self.trie.take_debug_recorders();
401
402 let end = Instant::now();
403 self.metrics.sparse_trie_final_update_duration_histogram.record(end.duration_since(start));
404 self.metrics.sparse_trie_total_duration_histogram.record(end.duration_since(now));
405
406 self.metrics.sparse_trie_account_cache_hits.record(self.account_cache_hits as f64);
407 self.metrics.sparse_trie_account_cache_misses.record(self.account_cache_misses as f64);
408 self.metrics.sparse_trie_storage_cache_hits.record(self.storage_cache_hits as f64);
409 self.metrics.sparse_trie_storage_cache_misses.record(self.storage_cache_misses as f64);
410 self.account_cache_hits = 0;
411 self.account_cache_misses = 0;
412 self.storage_cache_hits = 0;
413 self.storage_cache_misses = 0;
414
415 Ok(StateRootComputeOutcome {
416 state_root,
417 trie_updates: Arc::new(trie_updates),
418 #[cfg(feature = "trie-debug")]
419 debug_recorders,
420 })
421 }
422
423 fn on_message(&mut self, message: SparseTrieTaskMessage) {
425 match message {
426 SparseTrieTaskMessage::PrefetchProofs(targets) => self.on_prewarm_targets(targets),
427 SparseTrieTaskMessage::HashedState(hashed_state) => {
428 self.on_hashed_state_update(hashed_state)
429 }
430 SparseTrieTaskMessage::FinishedStateUpdates => {
431 let _ = self
432 .final_hashed_state_tx
433 .take()
434 .unwrap()
435 .send(core::mem::take(&mut self.final_hashed_state));
436 self.finished_state_updates = true
437 }
438 }
439 }
440
441 #[instrument(
442 level = "trace",
443 target = "engine::tree::payload_processor::sparse_trie",
444 skip_all
445 )]
446 fn on_prewarm_targets(&mut self, targets: MultiProofTargetsV2) {
447 for target in targets.account_targets {
448 self.new_account_updates.entry(target.key()).or_insert(LeafUpdate::Touched);
450 }
451
452 for (address, slots) in targets.storage_targets {
453 if !slots.is_empty() {
454 let new_updates = self.new_storage_updates.entry(address).or_default();
456 for slot in slots {
457 new_updates.entry(slot.key()).or_insert(LeafUpdate::Touched);
459 }
460 }
461
462 self.new_account_updates.entry(address).or_insert(LeafUpdate::Touched);
465 }
466 }
467
468 #[instrument(
470 level = "trace",
471 target = "engine::tree::payload_processor::sparse_trie",
472 skip_all
473 )]
474 fn on_hashed_state_update(&mut self, hashed_state_update: HashedPostState) {
475 for (&address, storage) in &hashed_state_update.storages {
476 if !storage.storage.is_empty() {
477 let new_updates = self.new_storage_updates.entry(address).or_default();
479 let mut existing_updates = self.storage_updates.get_mut(&address);
480
481 for (&slot, &value) in &storage.storage {
482 self.trie.record_slot_touch(address, slot);
483
484 let encoded = if value.is_zero() {
485 Vec::new()
486 } else {
487 alloy_rlp::encode_fixed_size(&value).to_vec()
488 };
489 new_updates.insert(slot, LeafUpdate::Changed(encoded));
490
491 if let Some(ref mut existing) = existing_updates {
493 existing.remove(&slot);
494 }
495 }
496 }
497
498 self.new_account_updates.entry(address).or_insert(LeafUpdate::Touched);
501
502 self.pending_account_updates.entry(address).or_insert(None);
505 }
506
507 for (&address, &account) in &hashed_state_update.accounts {
508 self.trie.record_account_touch(address);
509
510 self.new_account_updates.insert(address, LeafUpdate::Touched);
515
516 self.pending_account_updates.insert(address, Some(account));
519 }
520
521 self.final_hashed_state.extend(hashed_state_update);
522 }
523
524 fn on_proof_result(
525 &mut self,
526 result: DecodedMultiProofV2,
527 ) -> Result<(), ParallelStateRootError> {
528 self.trie.reveal_decoded_multiproof_v2(result).map_err(|e| {
529 ParallelStateRootError::Other(format!("could not reveal multiproof: {e:?}"))
530 })
531 }
532
533 fn process_new_updates(&mut self) -> SparseTrieResult<()> {
534 if self.pending_updates == 0 {
535 return Ok(());
536 }
537
538 let _span = debug_span!("process_new_updates").entered();
539 self.pending_updates = 0;
540
541 self.process_leaf_updates(true)?;
543
544 for (address, mut new) in self.new_storage_updates.drain() {
545 match self.storage_updates.entry(address) {
546 Entry::Vacant(entry) => {
547 entry.insert(new); }
549 Entry::Occupied(mut entry) => {
550 let updates = entry.get_mut();
551 for (slot, new) in new.drain() {
552 match updates.entry(slot) {
553 Entry::Occupied(mut slot_entry) => {
554 if new.is_changed() {
555 slot_entry.insert(new);
556 }
557 }
558 Entry::Vacant(slot_entry) => {
559 slot_entry.insert(new);
560 }
561 }
562 }
563 }
564 }
565 }
566
567 for (address, new) in self.new_account_updates.drain() {
568 match self.account_updates.entry(address) {
569 Entry::Occupied(mut entry) => {
570 if new.is_changed() {
571 entry.insert(new);
572 }
573 }
574 Entry::Vacant(entry) => {
575 entry.insert(new);
576 }
577 }
578 }
579
580 Ok(())
581 }
582
583 #[instrument(
586 level = "trace",
587 target = "engine::tree::payload_processor::sparse_trie",
588 skip_all
589 )]
590 fn process_leaf_updates(&mut self, new: bool) -> SparseTrieResult<()> {
591 let storage_updates =
592 if new { &mut self.new_storage_updates } else { &mut self.storage_updates };
593
594 let span = trace_span!("process_storage_leaf_updates").entered();
596 for (address, updates) in storage_updates {
597 if updates.is_empty() {
598 continue;
599 }
600 let _enter = trace_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage_trie_leaf_updates", a=%address).entered();
601
602 let trie = self.trie.get_or_create_storage_trie_mut(*address);
603 let fetched = self.fetched_storage_targets.entry(*address).or_default();
604 let mut targets = Vec::new();
605
606 let updates_len_before = updates.len();
607 trie.update_leaves(updates, |path, min_len| match fetched.entry(path) {
608 Entry::Occupied(mut entry) => {
609 if min_len < *entry.get() {
610 entry.insert(min_len);
611 targets.push(ProofV2Target::new(path).with_min_len(min_len));
612 }
613 }
614 Entry::Vacant(entry) => {
615 entry.insert(min_len);
616 targets.push(ProofV2Target::new(path).with_min_len(min_len));
617 }
618 })?;
619 let updates_len_after = updates.len();
620 self.storage_cache_hits += (updates_len_before - updates_len_after) as u64;
621 self.storage_cache_misses += updates_len_after as u64;
622
623 if !targets.is_empty() {
624 self.pending_targets.extend_storage_targets(address, targets);
625 }
626 }
627
628 drop(span);
629
630 self.process_account_leaf_updates(new)?;
632
633 Ok(())
634 }
635
636 #[instrument(
640 level = "trace",
641 target = "engine::tree::payload_processor::sparse_trie",
642 skip_all
643 )]
644 fn process_account_leaf_updates(&mut self, new: bool) -> SparseTrieResult<bool> {
645 let account_updates =
646 if new { &mut self.new_account_updates } else { &mut self.account_updates };
647
648 let updates_len_before = account_updates.len();
649
650 self.trie.trie_mut().update_leaves(account_updates, |target, min_len| {
651 match self.fetched_account_targets.entry(target) {
652 Entry::Occupied(mut entry) => {
653 if min_len < *entry.get() {
654 entry.insert(min_len);
655 self.pending_targets
656 .push_account_target(ProofV2Target::new(target).with_min_len(min_len));
657 }
658 }
659 Entry::Vacant(entry) => {
660 entry.insert(min_len);
661 self.pending_targets
662 .push_account_target(ProofV2Target::new(target).with_min_len(min_len));
663 }
664 }
665 })?;
666
667 let updates_len_after = account_updates.len();
668 self.account_cache_hits += (updates_len_before - updates_len_after) as u64;
669 self.account_cache_misses += updates_len_after as u64;
670
671 Ok(updates_len_after < updates_len_before)
672 }
673
674 fn compute_drained_storage_roots(&mut self) {
683 let addresses_to_compute_roots: Vec<_> = self
684 .storage_updates
685 .iter()
686 .filter_map(|(address, updates)| updates.is_empty().then_some(*address))
687 .collect();
688
689 struct SendStorageTriePtr<S>(*mut RevealableSparseTrie<S>);
690 unsafe impl<S: Send> Send for SendStorageTriePtr<S> {}
693
694 let mut tries_to_compute_roots: Vec<(B256, SendStorageTriePtr<S>)> =
695 Vec::with_capacity(addresses_to_compute_roots.len());
696 for address in addresses_to_compute_roots {
697 if let Some(trie) = self.trie.storage_tries_mut().get_mut(&address) &&
698 !trie.is_root_cached()
699 {
700 tries_to_compute_roots.push((address, SendStorageTriePtr(trie)));
701 }
702 }
703
704 if tries_to_compute_roots.is_empty() {
705 return;
706 }
707
708 let parent_span =
709 debug_span!("compute_drained_storage_roots", n = tries_to_compute_roots.len());
710 tries_to_compute_roots.into_par_iter().for_each(|(address, SendStorageTriePtr(trie))| {
711 let span = if tracing::enabled!(tracing::Level::TRACE) {
712 debug_span!(
713 target: "engine::tree::payload_processor::sparse_trie",
714 parent: &parent_span,
715 "storage_root",
716 ?address
717 )
718 } else {
719 debug_span!(
720 target: "engine::tree::payload_processor::sparse_trie",
721 parent: &parent_span,
722 "storage_root",
723 )
724 };
725 let _enter = span.entered();
726 unsafe { (*trie).root().expect("updates are drained, trie should be revealed by now") };
733 });
734 }
735
736 #[instrument(
740 level = "trace",
741 target = "engine::tree::payload_processor::sparse_trie",
742 skip_all
743 )]
744 fn promote_pending_account_updates(&mut self) -> SparseTrieResult<()> {
745 self.process_leaf_updates(false)?;
746
747 if self.pending_account_updates.is_empty() {
748 return Ok(());
749 }
750
751 self.compute_drained_storage_roots();
752
753 loop {
754 let span = trace_span!("promote_updates", promoted = tracing::field::Empty).entered();
755 let account_rlp_buf = &mut self.account_rlp_buf;
757 let mut num_promoted = 0;
758 self.pending_account_updates.retain(|addr, account| {
759 if let Some(updates) = self.storage_updates.get(addr) {
760 if !updates.is_empty() {
761 return true;
763 } else if let Some(account) = account.take() {
764 let storage_root = self.trie.storage_root(addr).expect("updates are drained, storage trie should be revealed by now");
765 let encoded = encode_account_leaf_value(account, storage_root, account_rlp_buf);
766 self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
767 num_promoted += 1;
768 return false;
769 }
770 }
771
772 let trie_account = match self.account_updates.get(addr) {
774 Some(LeafUpdate::Changed(encoded)) => {
775 Some(encoded).filter(|encoded| !encoded.is_empty())
776 }
777 Some(LeafUpdate::Touched) => return true,
779 None => self.trie.get_account_value(addr),
780 };
781
782 let trie_account = trie_account.map(|value| TrieAccount::decode(&mut &value[..]).expect("invalid account RLP"));
783
784 let (account, storage_root) = if let Some(account) = account.take() {
785 let storage_root = trie_account.map(|account| account.storage_root).unwrap_or(EMPTY_ROOT_HASH);
790
791 (account, storage_root)
792 } else {
793 (trie_account.map(Into::into), self.trie.storage_root(addr).expect("account had storage updates that were applied to its trie, storage root must be revealed by now"))
794 };
795
796 let encoded = encode_account_leaf_value(account, storage_root, account_rlp_buf);
797 self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
798 num_promoted += 1;
799
800 false
801 });
802 span.record("promoted", num_promoted);
803 drop(span);
804
805 if num_promoted == 0 || !self.process_account_leaf_updates(false)? {
810 break
811 }
812 }
813
814 Ok(())
815 }
816
817 fn dispatch_pending_targets(&mut self) {
818 if self.pending_targets.is_empty() {
819 return;
820 }
821
822 let _span = trace_span!("dispatch_pending_targets").entered();
823 let (targets, chunking_length) = self.pending_targets.take();
824 dispatch_with_chunking(
825 targets,
826 chunking_length,
827 self.chunk_size,
828 self.max_targets_for_chunking,
829 self.proof_worker_handle.has_multiple_idle_account_workers(),
830 self.proof_worker_handle.has_multiple_idle_storage_workers(),
831 MultiProofTargetsV2::chunks,
832 |proof_targets| {
833 if let Err(e) =
834 self.proof_worker_handle.dispatch_account_multiproof(AccountMultiproofInput {
835 targets: proof_targets,
836 proof_result_sender: ProofResultContext::new(
837 self.proof_result_tx.clone(),
838 HashedPostState::default(),
839 Instant::now(),
840 ),
841 })
842 {
843 error!("failed to dispatch account multiproof: {e:?}");
844 }
845 },
846 );
847 }
848}
849
850fn encode_account_leaf_value(
852 account: Option<Account>,
853 storage_root: B256,
854 account_rlp_buf: &mut Vec<u8>,
855) -> Vec<u8> {
856 if account.is_none_or(|account| account.is_empty()) && storage_root == EMPTY_ROOT_HASH {
857 return Vec::new();
858 }
859
860 account_rlp_buf.clear();
861 account.unwrap_or_default().into_trie_account(storage_root).encode(account_rlp_buf);
862 account_rlp_buf.clone()
863}
864
865#[derive(Default)]
867struct PendingTargets {
868 targets: MultiProofTargetsV2,
870 len: usize,
872}
873
874impl PendingTargets {
875 const fn len(&self) -> usize {
877 self.len
878 }
879
880 const fn is_empty(&self) -> bool {
882 self.len == 0
883 }
884
885 fn take(&mut self) -> (MultiProofTargetsV2, usize) {
887 (std::mem::take(&mut self.targets), std::mem::take(&mut self.len))
888 }
889
890 fn push_account_target(&mut self, target: ProofV2Target) {
892 self.targets.account_targets.push(target);
893 self.len += 1;
894 }
895
896 fn extend_storage_targets(&mut self, address: &B256, targets: Vec<ProofV2Target>) {
898 self.len += targets.len();
899 self.targets.storage_targets.entry(*address).or_default().extend(targets);
900 }
901}
902
903enum SparseTrieTaskMessage {
905 HashedState(HashedPostState),
907 PrefetchProofs(MultiProofTargetsV2),
909 FinishedStateUpdates,
911}
912
913#[cfg(test)]
914mod tests {
915 use super::*;
916 use alloy_primitives::{keccak256, Address, B256, U256};
917 use reth_provider::{
918 providers::{OverlayBuilder, OverlayStateProviderFactory},
919 test_utils::create_test_provider_factory,
920 ChainSpecProvider,
921 };
922 use reth_trie_db::ChangesetCache;
923 use reth_trie_parallel::proof_task::ProofTaskCtx;
924 use reth_trie_sparse::ArenaParallelSparseTrie;
925
926 #[test]
927 fn test_run_hashing_task_hashed_state_update_forwards() {
928 let (updates_tx, updates_rx) = crossbeam_channel::unbounded();
929 let (hashed_state_tx, hashed_state_rx) = crossbeam_channel::unbounded();
930
931 let address = keccak256(Address::random());
932 let slot = keccak256(U256::from(42).to_be_bytes::<32>());
933 let value = U256::from(999);
934
935 let mut hashed_state = HashedPostState::default();
936 hashed_state.accounts.insert(
937 address,
938 Some(Account { balance: U256::from(100), nonce: 1, bytecode_hash: None }),
939 );
940 let mut storage = reth_trie::HashedStorage::new(false);
941 storage.storage.insert(slot, value);
942 hashed_state.storages.insert(address, storage);
943
944 let expected_state = hashed_state.clone();
945
946 let handle = std::thread::spawn(move || {
947 SparseTrieCacheTask::<ArenaParallelSparseTrie, ArenaParallelSparseTrie>::run_hashing_task(
948 updates_rx,
949 hashed_state_tx,
950 MultiProofTaskMetrics::default(),
951 );
952 });
953
954 updates_tx.send(StateRootMessage::HashedStateUpdate(hashed_state)).unwrap();
955 updates_tx.send(StateRootMessage::FinishedStateUpdates).unwrap();
956 drop(updates_tx);
957
958 let SparseTrieTaskMessage::HashedState(received) = hashed_state_rx.recv().unwrap() else {
959 panic!("expected HashedState message");
960 };
961
962 let account = received.accounts.get(&address).unwrap().unwrap();
963 assert_eq!(account.balance, expected_state.accounts[&address].unwrap().balance);
964 assert_eq!(account.nonce, expected_state.accounts[&address].unwrap().nonce);
965
966 let storage = received.storages.get(&address).unwrap();
967 assert_eq!(*storage.storage.get(&slot).unwrap(), value);
968
969 let second = hashed_state_rx.recv().unwrap();
970 assert!(matches!(second, SparseTrieTaskMessage::FinishedStateUpdates));
971
972 assert!(hashed_state_rx.recv().is_err());
973 handle.join().unwrap();
974 }
975
976 #[test]
977 fn test_encode_account_leaf_value_empty_account_and_empty_root_is_empty() {
978 let mut account_rlp_buf = vec![0xAB];
979 let encoded = encode_account_leaf_value(None, EMPTY_ROOT_HASH, &mut account_rlp_buf);
980
981 assert!(encoded.is_empty());
982 assert_eq!(account_rlp_buf, vec![0xAB]);
984 }
985
986 #[test]
987 fn test_encode_account_leaf_value_non_empty_account_is_rlp() {
988 let storage_root = B256::from([0x99; 32]);
989 let account = Some(Account {
990 nonce: 7,
991 balance: U256::from(42),
992 bytecode_hash: Some(B256::from([0xAA; 32])),
993 });
994 let mut account_rlp_buf = vec![0x00, 0x01];
995
996 let encoded = encode_account_leaf_value(account, storage_root, &mut account_rlp_buf);
997 let decoded = TrieAccount::decode(&mut &encoded[..]).expect("valid account RLP");
998
999 assert_eq!(decoded.nonce, 7);
1000 assert_eq!(decoded.balance, U256::from(42));
1001 assert_eq!(decoded.storage_root, storage_root);
1002 assert_eq!(account_rlp_buf, encoded);
1003 }
1004
1005 #[test]
1006 fn run_returns_parent_root_without_revealing_blind_trie_when_no_state_updates() {
1007 let runtime = reth_tasks::Runtime::test();
1008 let provider_factory = create_test_provider_factory();
1009 let anchor_hash = provider_factory.chain_spec().genesis_hash();
1010 let overlay_factory = OverlayStateProviderFactory::new(
1011 provider_factory,
1012 OverlayBuilder::<reth_chain_state::EthPrimitives>::new(
1013 anchor_hash,
1014 ChangesetCache::new(),
1015 ),
1016 );
1017 let proof_worker_handle =
1018 ProofWorkerHandle::new(&runtime, ProofTaskCtx::new(overlay_factory), false);
1019
1020 let default_trie = RevealableSparseTrie::blind_from(ConfigurableSparseTrie::Arena(
1021 ArenaParallelSparseTrie::default(),
1022 ));
1023 let trie = SparseStateTrie::default()
1024 .with_accounts_trie(default_trie.clone())
1025 .with_default_storage_trie(default_trie)
1026 .with_updates(true);
1027
1028 let parent_state_root = B256::from([0x55; 32]);
1029 let (updates_tx, updates_rx) = crossbeam_channel::unbounded();
1030 let mut task = SparseTrieCacheTask::new_with_trie(
1031 &runtime,
1032 updates_rx,
1033 std::sync::mpsc::channel().0,
1034 proof_worker_handle,
1035 MultiProofTaskMetrics::default(),
1036 trie,
1037 parent_state_root,
1038 1,
1039 );
1040
1041 updates_tx.send(StateRootMessage::FinishedStateUpdates).unwrap();
1042 drop(updates_tx);
1043
1044 let outcome = task.run().expect("state root computation should succeed");
1045
1046 assert_eq!(outcome.state_root, parent_state_root);
1047 assert!(outcome.trie_updates.is_empty());
1048 assert!(task.trie.state_trie_ref().is_none(), "blind trie should not be revealed");
1049 }
1050}