1use std::sync::Arc;
4
5use crate::tree::{
6 multiproof::{
7 dispatch_with_chunking, evm_state_to_hashed_post_state, StateRootComputeOutcome,
8 StateRootMessage, DEFAULT_MAX_TARGETS_FOR_CHUNKING,
9 },
10 payload_processor::multiproof::MultiProofTaskMetrics,
11};
12use alloy_primitives::B256;
13use alloy_rlp::{Decodable, Encodable};
14use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
15use rayon::iter::{IntoParallelIterator, ParallelIterator};
16use reth_primitives_traits::{Account, FastInstant as Instant};
17use reth_tasks::Runtime;
18use reth_trie::{
19 updates::TrieUpdates, DecodedMultiProofV2, HashedPostState, TrieAccount, EMPTY_ROOT_HASH,
20 TRIE_ACCOUNT_RLP_MAX_SIZE,
21};
22use reth_trie_common::{MultiProofTargetsV2, ProofV2Target};
23use reth_trie_parallel::{
24 proof_task::{
25 AccountMultiproofInput, ProofResultContext, ProofResultMessage, ProofWorkerHandle,
26 },
27 root::ParallelStateRootError,
28};
29use reth_trie_sparse::{
30 errors::{SparseStateTrieErrorKind, SparseTrieErrorKind, SparseTrieResult},
31 ArenaParallelSparseTrie, DeferredDrops, LeafUpdate, RevealableSparseTrie, SparseStateTrie,
32 SparseTrie,
33};
34use revm_primitives::{hash_map::Entry, B256Map};
35use tracing::{debug, debug_span, error, instrument, trace_span};
36
37pub(super) struct SparseTrieCacheTask<A = ArenaParallelSparseTrie, S = ArenaParallelSparseTrie> {
39 proof_result_tx: CrossbeamSender<ProofResultMessage>,
41 proof_result_rx: CrossbeamReceiver<ProofResultMessage>,
43 updates: CrossbeamReceiver<SparseTrieTaskMessage>,
45 final_hashed_state_tx: Option<std::sync::mpsc::Sender<HashedPostState>>,
47 trie: SparseStateTrie<A, S>,
49 parent_state_root: B256,
51 proof_worker_handle: ProofWorkerHandle,
53
54 chunk_size: usize,
57 max_targets_for_chunking: usize,
61
62 account_updates: B256Map<LeafUpdate>,
64 storage_updates: B256Map<B256Map<LeafUpdate>>,
66
67 new_account_updates: B256Map<LeafUpdate>,
69 new_storage_updates: B256Map<B256Map<LeafUpdate>>,
71 pending_account_updates: B256Map<Option<Option<Account>>>,
85 fetched_account_targets: B256Map<u8>,
88 fetched_storage_targets: B256Map<B256Map<u8>>,
91 account_rlp_buf: Vec<u8>,
93 finished_state_updates: bool,
95 account_cache_hits: u64,
97 account_cache_misses: u64,
99 storage_cache_hits: u64,
101 storage_cache_misses: u64,
103 pending_targets: PendingTargets,
105 pending_updates: usize,
108 final_hashed_state: HashedPostState,
114
115 metrics: MultiProofTaskMetrics,
117}
118
119impl<A, S> SparseTrieCacheTask<A, S>
120where
121 A: SparseTrie + Default,
122 S: SparseTrie + Default + Clone,
123{
124 #[expect(clippy::too_many_arguments)]
126 pub(super) fn new_with_trie(
127 executor: &Runtime,
128 updates: CrossbeamReceiver<StateRootMessage>,
129 final_hashed_state_tx: std::sync::mpsc::Sender<HashedPostState>,
130 proof_worker_handle: ProofWorkerHandle,
131 metrics: MultiProofTaskMetrics,
132 trie: SparseStateTrie<A, S>,
133 parent_state_root: B256,
134 chunk_size: usize,
135 ) -> Self {
136 let (proof_result_tx, proof_result_rx) = crossbeam_channel::unbounded();
137 let (hashed_state_tx, hashed_state_rx) = crossbeam_channel::unbounded();
138
139 let parent_span = tracing::Span::current();
140 let hashing_metrics = metrics.clone();
141 executor.spawn_blocking_named("trie-hashing", move || {
142 let _span = trace_span!(parent: parent_span, "run_hashing_task").entered();
143 Self::run_hashing_task(updates, hashed_state_tx, hashing_metrics)
144 });
145
146 Self {
147 proof_result_tx,
148 proof_result_rx,
149 updates: hashed_state_rx,
150 proof_worker_handle,
151 final_hashed_state_tx: Some(final_hashed_state_tx),
152 trie,
153 parent_state_root,
154 chunk_size,
155 max_targets_for_chunking: DEFAULT_MAX_TARGETS_FOR_CHUNKING,
156 account_updates: Default::default(),
157 storage_updates: Default::default(),
158 new_account_updates: Default::default(),
159 new_storage_updates: Default::default(),
160 pending_account_updates: Default::default(),
161 fetched_account_targets: Default::default(),
162 fetched_storage_targets: Default::default(),
163 account_rlp_buf: Vec::with_capacity(TRIE_ACCOUNT_RLP_MAX_SIZE),
164 finished_state_updates: Default::default(),
165 account_cache_hits: 0,
166 account_cache_misses: 0,
167 storage_cache_hits: 0,
168 storage_cache_misses: 0,
169 pending_targets: Default::default(),
170 pending_updates: Default::default(),
171 final_hashed_state: Default::default(),
172 metrics,
173 }
174 }
175
176 fn run_hashing_task(
179 updates: CrossbeamReceiver<StateRootMessage>,
180 hashed_state_tx: CrossbeamSender<SparseTrieTaskMessage>,
181 metrics: MultiProofTaskMetrics,
182 ) {
183 let mut total_idle_time = std::time::Duration::ZERO;
184 let mut idle_start = Instant::now();
185
186 while let Ok(message) = updates.recv() {
187 total_idle_time += idle_start.elapsed();
188
189 let msg = match message {
190 StateRootMessage::PrefetchProofs(targets) => {
191 SparseTrieTaskMessage::PrefetchProofs(targets)
192 }
193 StateRootMessage::StateUpdate(state) => {
194 let _span = trace_span!(target: "engine::tree::payload_processor::sparse_trie", "hashing_state_update", n = state.len()).entered();
195 let hashed = evm_state_to_hashed_post_state(state);
196 SparseTrieTaskMessage::HashedState(hashed)
197 }
198 StateRootMessage::FinishedStateUpdates => {
199 SparseTrieTaskMessage::FinishedStateUpdates
200 }
201 StateRootMessage::BlockAccessList(_) => {
202 idle_start = Instant::now();
203 continue;
204 }
205 StateRootMessage::HashedStateUpdate(state) => {
206 SparseTrieTaskMessage::HashedState(state)
207 }
208 };
209 if hashed_state_tx.send(msg).is_err() {
210 break;
211 }
212
213 idle_start = Instant::now();
214 }
215
216 metrics.hashing_task_idle_time_seconds.record(total_idle_time.as_secs_f64());
217 }
218
219 pub(super) fn into_trie_for_reuse(
226 self,
227 max_hot_slots: usize,
228 max_hot_accounts: usize,
229 disable_pruning: bool,
230 ) -> (SparseStateTrie<A, S>, DeferredDrops) {
231 let Self { mut trie, .. } = self;
232 if !disable_pruning {
233 trie.prune(max_hot_slots, max_hot_accounts);
234 }
235 let deferred = trie.take_deferred_drops();
236 (trie, deferred)
237 }
238
239 pub(super) fn into_cleared_trie(self) -> (SparseStateTrie<A, S>, DeferredDrops) {
244 let Self { mut trie, .. } = self;
245 trie.clear();
246 let deferred = trie.take_deferred_drops();
247 (trie, deferred)
248 }
249
250 #[instrument(
257 name = "SparseTrieCacheTask::run",
258 level = "debug",
259 target = "engine::tree::payload_processor::sparse_trie",
260 skip_all
261 )]
262 pub(super) fn run(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
263 let now = Instant::now();
264
265 let mut total_idle_time = std::time::Duration::ZERO;
266 let mut idle_start = Instant::now();
267
268 loop {
269 let mut t = Instant::now();
270 crossbeam_channel::select_biased! {
271 recv(self.updates) -> message => {
272 let wake = Instant::now();
273
274 let update = match message {
275 Ok(m) => m,
276 Err(_) => {
277 return Err(ParallelStateRootError::Other(
278 "updates channel disconnected before state root calculation".to_string(),
279 ))
280 }
281 };
282
283 total_idle_time += wake.duration_since(idle_start);
284 self.metrics
285 .sparse_trie_channel_wait_duration_histogram
286 .record(wake.duration_since(t));
287
288 self.on_message(update);
289 self.pending_updates += 1;
290 }
291 recv(self.proof_result_rx) -> message => {
292 let phase_end = Instant::now();
293 total_idle_time += phase_end.duration_since(idle_start);
294 self.metrics
295 .sparse_trie_channel_wait_duration_histogram
296 .record(phase_end.duration_since(t));
297 t = phase_end;
298
299 let Ok(result) = message else {
300 unreachable!("we own the sender half")
301 };
302
303 let mut result = result.result?;
304 while let Ok(next) = self.proof_result_rx.try_recv() {
305 let res = next.result?;
306 result.extend(res);
307 }
308
309 let phase_end = Instant::now();
310 self.metrics
311 .sparse_trie_proof_coalesce_duration_histogram
312 .record(phase_end.duration_since(t));
313 t = phase_end;
314
315 self.on_proof_result(result)?;
316 self.metrics
317 .sparse_trie_reveal_multiproof_duration_histogram
318 .record(t.elapsed());
319 },
320 }
321
322 if self.updates.is_empty() && self.proof_result_rx.is_empty() {
323 self.dispatch_pending_targets();
326 t = Instant::now();
327 self.process_new_updates()?;
328 self.promote_pending_account_updates()?;
329 self.metrics.sparse_trie_process_updates_duration_histogram.record(t.elapsed());
330
331 if self.finished_state_updates &&
332 self.account_updates.is_empty() &&
333 self.storage_updates.iter().all(|(_, updates)| updates.is_empty())
334 {
335 break;
336 }
337
338 self.dispatch_pending_targets();
339
340 if self.proof_result_rx.is_empty() {
343 self.trie.calculate_subtries();
344 }
345 } else if self.updates.is_empty() {
346 t = Instant::now();
348 self.process_new_updates()?;
349 self.metrics.sparse_trie_process_updates_duration_histogram.record(t.elapsed());
350 self.dispatch_pending_targets();
351 } else if self.pending_targets.len() > self.chunk_size {
352 self.dispatch_pending_targets();
354 }
355
356 idle_start = Instant::now();
357 }
358
359 self.metrics.sparse_trie_idle_time_seconds.record(total_idle_time.as_secs_f64());
360
361 debug!(target: "engine::root", "All proofs processed, ending calculation");
362
363 let start = Instant::now();
364 let (state_root, trie_updates) = match self.trie.root_with_updates() {
365 Ok(result) => result,
366 Err(err)
367 if matches!(
368 err.kind(),
369 SparseStateTrieErrorKind::Sparse(SparseTrieErrorKind::Blind)
370 ) =>
371 {
372 (self.parent_state_root, TrieUpdates::default())
376 }
377 Err(err) => {
378 return Err(ParallelStateRootError::Other(format!(
379 "could not calculate state root: {err:?}"
380 )))
381 }
382 };
383
384 #[cfg(feature = "trie-debug")]
385 let debug_recorders = self.trie.take_debug_recorders();
386
387 let end = Instant::now();
388 self.metrics.sparse_trie_final_update_duration_histogram.record(end.duration_since(start));
389 self.metrics.sparse_trie_total_duration_histogram.record(end.duration_since(now));
390
391 self.metrics.sparse_trie_account_cache_hits.record(self.account_cache_hits as f64);
392 self.metrics.sparse_trie_account_cache_misses.record(self.account_cache_misses as f64);
393 self.metrics.sparse_trie_storage_cache_hits.record(self.storage_cache_hits as f64);
394 self.metrics.sparse_trie_storage_cache_misses.record(self.storage_cache_misses as f64);
395 self.account_cache_hits = 0;
396 self.account_cache_misses = 0;
397 self.storage_cache_hits = 0;
398 self.storage_cache_misses = 0;
399
400 Ok(StateRootComputeOutcome {
401 state_root,
402 trie_updates: Arc::new(trie_updates),
403 #[cfg(feature = "trie-debug")]
404 debug_recorders,
405 })
406 }
407
408 fn on_message(&mut self, message: SparseTrieTaskMessage) {
410 match message {
411 SparseTrieTaskMessage::PrefetchProofs(targets) => self.on_prewarm_targets(targets),
412 SparseTrieTaskMessage::HashedState(hashed_state) => {
413 self.on_hashed_state_update(hashed_state)
414 }
415 SparseTrieTaskMessage::FinishedStateUpdates => {
416 let _ = self
417 .final_hashed_state_tx
418 .take()
419 .unwrap()
420 .send(core::mem::take(&mut self.final_hashed_state));
421 self.finished_state_updates = true
422 }
423 }
424 }
425
426 #[instrument(
427 level = "trace",
428 target = "engine::tree::payload_processor::sparse_trie",
429 skip_all
430 )]
431 fn on_prewarm_targets(&mut self, targets: MultiProofTargetsV2) {
432 for target in targets.account_targets {
433 self.new_account_updates.entry(target.key()).or_insert(LeafUpdate::Touched);
435 }
436
437 for (address, slots) in targets.storage_targets {
438 if !slots.is_empty() {
439 let new_updates = self.new_storage_updates.entry(address).or_default();
441 for slot in slots {
442 new_updates.entry(slot.key()).or_insert(LeafUpdate::Touched);
444 }
445 }
446
447 self.new_account_updates.entry(address).or_insert(LeafUpdate::Touched);
450 }
451 }
452
453 #[instrument(
455 level = "trace",
456 target = "engine::tree::payload_processor::sparse_trie",
457 skip_all
458 )]
459 fn on_hashed_state_update(&mut self, hashed_state_update: HashedPostState) {
460 for (&address, storage) in &hashed_state_update.storages {
461 if !storage.storage.is_empty() {
462 let new_updates = self.new_storage_updates.entry(address).or_default();
464 let mut existing_updates = self.storage_updates.get_mut(&address);
465
466 for (&slot, &value) in &storage.storage {
467 self.trie.record_slot_touch(address, slot);
468
469 let encoded = if value.is_zero() {
470 Vec::new()
471 } else {
472 alloy_rlp::encode_fixed_size(&value).to_vec()
473 };
474 new_updates.insert(slot, LeafUpdate::Changed(encoded));
475
476 if let Some(ref mut existing) = existing_updates {
478 existing.remove(&slot);
479 }
480 }
481 }
482
483 self.new_account_updates.entry(address).or_insert(LeafUpdate::Touched);
486
487 self.pending_account_updates.entry(address).or_insert(None);
490 }
491
492 for (&address, &account) in &hashed_state_update.accounts {
493 self.trie.record_account_touch(address);
494
495 self.new_account_updates.insert(address, LeafUpdate::Touched);
500
501 self.pending_account_updates.insert(address, Some(account));
504 }
505
506 self.final_hashed_state.extend(hashed_state_update);
507 }
508
509 fn on_proof_result(
510 &mut self,
511 result: DecodedMultiProofV2,
512 ) -> Result<(), ParallelStateRootError> {
513 self.trie.reveal_decoded_multiproof_v2(result).map_err(|e| {
514 ParallelStateRootError::Other(format!("could not reveal multiproof: {e:?}"))
515 })
516 }
517
518 fn process_new_updates(&mut self) -> SparseTrieResult<()> {
519 if self.pending_updates == 0 {
520 return Ok(());
521 }
522
523 let _span = debug_span!("process_new_updates").entered();
524 self.pending_updates = 0;
525
526 self.process_leaf_updates(true)?;
528
529 for (address, mut new) in self.new_storage_updates.drain() {
530 match self.storage_updates.entry(address) {
531 Entry::Vacant(entry) => {
532 entry.insert(new); }
534 Entry::Occupied(mut entry) => {
535 let updates = entry.get_mut();
536 for (slot, new) in new.drain() {
537 match updates.entry(slot) {
538 Entry::Occupied(mut slot_entry) => {
539 if new.is_changed() {
540 slot_entry.insert(new);
541 }
542 }
543 Entry::Vacant(slot_entry) => {
544 slot_entry.insert(new);
545 }
546 }
547 }
548 }
549 }
550 }
551
552 for (address, new) in self.new_account_updates.drain() {
553 match self.account_updates.entry(address) {
554 Entry::Occupied(mut entry) => {
555 if new.is_changed() {
556 entry.insert(new);
557 }
558 }
559 Entry::Vacant(entry) => {
560 entry.insert(new);
561 }
562 }
563 }
564
565 Ok(())
566 }
567
568 #[instrument(
571 level = "trace",
572 target = "engine::tree::payload_processor::sparse_trie",
573 skip_all
574 )]
575 fn process_leaf_updates(&mut self, new: bool) -> SparseTrieResult<()> {
576 let storage_updates =
577 if new { &mut self.new_storage_updates } else { &mut self.storage_updates };
578
579 let span = trace_span!("process_storage_leaf_updates").entered();
581 for (address, updates) in storage_updates {
582 if updates.is_empty() {
583 continue;
584 }
585 let _enter = trace_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage_trie_leaf_updates", a=%address).entered();
586
587 let trie = self.trie.get_or_create_storage_trie_mut(*address);
588 let fetched = self.fetched_storage_targets.entry(*address).or_default();
589 let mut targets = Vec::new();
590
591 let updates_len_before = updates.len();
592 trie.update_leaves(updates, |path, min_len| match fetched.entry(path) {
593 Entry::Occupied(mut entry) => {
594 if min_len < *entry.get() {
595 entry.insert(min_len);
596 targets.push(ProofV2Target::new(path).with_min_len(min_len));
597 }
598 }
599 Entry::Vacant(entry) => {
600 entry.insert(min_len);
601 targets.push(ProofV2Target::new(path).with_min_len(min_len));
602 }
603 })?;
604 let updates_len_after = updates.len();
605 self.storage_cache_hits += (updates_len_before - updates_len_after) as u64;
606 self.storage_cache_misses += updates_len_after as u64;
607
608 if !targets.is_empty() {
609 self.pending_targets.extend_storage_targets(address, targets);
610 }
611 }
612
613 drop(span);
614
615 self.process_account_leaf_updates(new)?;
617
618 Ok(())
619 }
620
621 #[instrument(
625 level = "trace",
626 target = "engine::tree::payload_processor::sparse_trie",
627 skip_all
628 )]
629 fn process_account_leaf_updates(&mut self, new: bool) -> SparseTrieResult<bool> {
630 let account_updates =
631 if new { &mut self.new_account_updates } else { &mut self.account_updates };
632
633 let updates_len_before = account_updates.len();
634
635 self.trie.trie_mut().update_leaves(account_updates, |target, min_len| {
636 match self.fetched_account_targets.entry(target) {
637 Entry::Occupied(mut entry) => {
638 if min_len < *entry.get() {
639 entry.insert(min_len);
640 self.pending_targets
641 .push_account_target(ProofV2Target::new(target).with_min_len(min_len));
642 }
643 }
644 Entry::Vacant(entry) => {
645 entry.insert(min_len);
646 self.pending_targets
647 .push_account_target(ProofV2Target::new(target).with_min_len(min_len));
648 }
649 }
650 })?;
651
652 let updates_len_after = account_updates.len();
653 self.account_cache_hits += (updates_len_before - updates_len_after) as u64;
654 self.account_cache_misses += updates_len_after as u64;
655
656 Ok(updates_len_after < updates_len_before)
657 }
658
659 fn compute_drained_storage_roots(&mut self) {
668 let addresses_to_compute_roots: Vec<_> = self
669 .storage_updates
670 .iter()
671 .filter_map(|(address, updates)| updates.is_empty().then_some(*address))
672 .collect();
673
674 struct SendStorageTriePtr<S>(*mut RevealableSparseTrie<S>);
675 unsafe impl<S: Send> Send for SendStorageTriePtr<S> {}
678
679 let mut tries_to_compute_roots: Vec<(B256, SendStorageTriePtr<S>)> =
680 Vec::with_capacity(addresses_to_compute_roots.len());
681 for address in addresses_to_compute_roots {
682 if let Some(trie) = self.trie.storage_tries_mut().get_mut(&address) &&
683 !trie.is_root_cached()
684 {
685 tries_to_compute_roots.push((address, SendStorageTriePtr(trie)));
686 }
687 }
688
689 if tries_to_compute_roots.is_empty() {
690 return;
691 }
692
693 let parent_span =
694 debug_span!("compute_drained_storage_roots", n = tries_to_compute_roots.len());
695 tries_to_compute_roots.into_par_iter().for_each(|(address, SendStorageTriePtr(trie))| {
696 let span = if tracing::enabled!(tracing::Level::TRACE) {
697 debug_span!(
698 target: "engine::tree::payload_processor::sparse_trie",
699 parent: &parent_span,
700 "storage_root",
701 ?address
702 )
703 } else {
704 debug_span!(
705 target: "engine::tree::payload_processor::sparse_trie",
706 parent: &parent_span,
707 "storage_root",
708 )
709 };
710 let _enter = span.entered();
711 unsafe { (*trie).root().expect("updates are drained, trie should be revealed by now") };
718 });
719 }
720
721 #[instrument(
725 level = "trace",
726 target = "engine::tree::payload_processor::sparse_trie",
727 skip_all
728 )]
729 fn promote_pending_account_updates(&mut self) -> SparseTrieResult<()> {
730 self.process_leaf_updates(false)?;
731
732 if self.pending_account_updates.is_empty() {
733 return Ok(());
734 }
735
736 self.compute_drained_storage_roots();
737
738 loop {
739 let span = trace_span!("promote_updates", promoted = tracing::field::Empty).entered();
740 let account_rlp_buf = &mut self.account_rlp_buf;
742 let mut num_promoted = 0;
743 self.pending_account_updates.retain(|addr, account| {
744 if let Some(updates) = self.storage_updates.get(addr) {
745 if !updates.is_empty() {
746 return true;
748 } else if let Some(account) = account.take() {
749 let storage_root = self.trie.storage_root(addr).expect("updates are drained, storage trie should be revealed by now");
750 let encoded = encode_account_leaf_value(account, storage_root, account_rlp_buf);
751 self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
752 num_promoted += 1;
753 return false;
754 }
755 }
756
757 let trie_account = match self.account_updates.get(addr) {
759 Some(LeafUpdate::Changed(encoded)) => {
760 Some(encoded).filter(|encoded| !encoded.is_empty())
761 }
762 Some(LeafUpdate::Touched) => return true,
764 None => self.trie.get_account_value(addr),
765 };
766
767 let trie_account = trie_account.map(|value| TrieAccount::decode(&mut &value[..]).expect("invalid account RLP"));
768
769 let (account, storage_root) = if let Some(account) = account.take() {
770 let storage_root = trie_account.map(|account| account.storage_root).unwrap_or(EMPTY_ROOT_HASH);
775
776 (account, storage_root)
777 } else {
778 (trie_account.map(Into::into), self.trie.storage_root(addr).expect("account had storage updates that were applied to its trie, storage root must be revealed by now"))
779 };
780
781 let encoded = encode_account_leaf_value(account, storage_root, account_rlp_buf);
782 self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
783 num_promoted += 1;
784
785 false
786 });
787 span.record("promoted", num_promoted);
788 drop(span);
789
790 if num_promoted == 0 || !self.process_account_leaf_updates(false)? {
795 break
796 }
797 }
798
799 Ok(())
800 }
801
802 fn dispatch_pending_targets(&mut self) {
803 if self.pending_targets.is_empty() {
804 return;
805 }
806
807 let _span = trace_span!("dispatch_pending_targets").entered();
808 let (targets, chunking_length) = self.pending_targets.take();
809 dispatch_with_chunking(
810 targets,
811 chunking_length,
812 self.chunk_size,
813 self.max_targets_for_chunking,
814 self.proof_worker_handle.has_multiple_idle_account_workers(),
815 self.proof_worker_handle.has_multiple_idle_storage_workers(),
816 MultiProofTargetsV2::chunks,
817 |proof_targets| {
818 if let Err(e) =
819 self.proof_worker_handle.dispatch_account_multiproof(AccountMultiproofInput {
820 targets: proof_targets,
821 proof_result_sender: ProofResultContext::new(
822 self.proof_result_tx.clone(),
823 HashedPostState::default(),
824 Instant::now(),
825 ),
826 })
827 {
828 error!("failed to dispatch account multiproof: {e:?}");
829 }
830 },
831 );
832 }
833}
834
835fn encode_account_leaf_value(
837 account: Option<Account>,
838 storage_root: B256,
839 account_rlp_buf: &mut Vec<u8>,
840) -> Vec<u8> {
841 if account.is_none_or(|account| account.is_empty()) && storage_root == EMPTY_ROOT_HASH {
842 return Vec::new();
843 }
844
845 account_rlp_buf.clear();
846 account.unwrap_or_default().into_trie_account(storage_root).encode(account_rlp_buf);
847 account_rlp_buf.clone()
848}
849
850#[derive(Default)]
852struct PendingTargets {
853 targets: MultiProofTargetsV2,
855 len: usize,
857}
858
859impl PendingTargets {
860 const fn len(&self) -> usize {
862 self.len
863 }
864
865 const fn is_empty(&self) -> bool {
867 self.len == 0
868 }
869
870 fn take(&mut self) -> (MultiProofTargetsV2, usize) {
872 (std::mem::take(&mut self.targets), std::mem::take(&mut self.len))
873 }
874
875 fn push_account_target(&mut self, target: ProofV2Target) {
877 self.targets.account_targets.push(target);
878 self.len += 1;
879 }
880
881 fn extend_storage_targets(&mut self, address: &B256, targets: Vec<ProofV2Target>) {
883 self.len += targets.len();
884 self.targets.storage_targets.entry(*address).or_default().extend(targets);
885 }
886}
887
888enum SparseTrieTaskMessage {
890 HashedState(HashedPostState),
892 PrefetchProofs(MultiProofTargetsV2),
894 FinishedStateUpdates,
896}
897
898#[cfg(test)]
899mod tests {
900 use super::*;
901 use alloy_primitives::{keccak256, Address, B256, U256};
902 use reth_provider::{
903 providers::{OverlayBuilder, OverlayStateProviderFactory},
904 test_utils::create_test_provider_factory,
905 ChainSpecProvider,
906 };
907 use reth_trie_db::ChangesetCache;
908 use reth_trie_parallel::proof_task::ProofTaskCtx;
909 use reth_trie_sparse::ArenaParallelSparseTrie;
910
911 #[test]
912 fn test_run_hashing_task_hashed_state_update_forwards() {
913 let (updates_tx, updates_rx) = crossbeam_channel::unbounded();
914 let (hashed_state_tx, hashed_state_rx) = crossbeam_channel::unbounded();
915
916 let address = keccak256(Address::random());
917 let slot = keccak256(U256::from(42).to_be_bytes::<32>());
918 let value = U256::from(999);
919
920 let mut hashed_state = HashedPostState::default();
921 hashed_state.accounts.insert(
922 address,
923 Some(Account { balance: U256::from(100), nonce: 1, bytecode_hash: None }),
924 );
925 let mut storage = reth_trie::HashedStorage::new(false);
926 storage.storage.insert(slot, value);
927 hashed_state.storages.insert(address, storage);
928
929 let expected_state = hashed_state.clone();
930
931 let handle = std::thread::spawn(move || {
932 SparseTrieCacheTask::<ArenaParallelSparseTrie, ArenaParallelSparseTrie>::run_hashing_task(
933 updates_rx,
934 hashed_state_tx,
935 MultiProofTaskMetrics::default(),
936 );
937 });
938
939 updates_tx.send(StateRootMessage::HashedStateUpdate(hashed_state)).unwrap();
940 updates_tx.send(StateRootMessage::FinishedStateUpdates).unwrap();
941 drop(updates_tx);
942
943 let SparseTrieTaskMessage::HashedState(received) = hashed_state_rx.recv().unwrap() else {
944 panic!("expected HashedState message");
945 };
946
947 let account = received.accounts.get(&address).unwrap().unwrap();
948 assert_eq!(account.balance, expected_state.accounts[&address].unwrap().balance);
949 assert_eq!(account.nonce, expected_state.accounts[&address].unwrap().nonce);
950
951 let storage = received.storages.get(&address).unwrap();
952 assert_eq!(*storage.storage.get(&slot).unwrap(), value);
953
954 let second = hashed_state_rx.recv().unwrap();
955 assert!(matches!(second, SparseTrieTaskMessage::FinishedStateUpdates));
956
957 assert!(hashed_state_rx.recv().is_err());
958 handle.join().unwrap();
959 }
960
961 #[test]
962 fn test_encode_account_leaf_value_empty_account_and_empty_root_is_empty() {
963 let mut account_rlp_buf = vec![0xAB];
964 let encoded = encode_account_leaf_value(None, EMPTY_ROOT_HASH, &mut account_rlp_buf);
965
966 assert!(encoded.is_empty());
967 assert_eq!(account_rlp_buf, vec![0xAB]);
969 }
970
971 #[test]
972 fn test_encode_account_leaf_value_non_empty_account_is_rlp() {
973 let storage_root = B256::from([0x99; 32]);
974 let account = Some(Account {
975 nonce: 7,
976 balance: U256::from(42),
977 bytecode_hash: Some(B256::from([0xAA; 32])),
978 });
979 let mut account_rlp_buf = vec![0x00, 0x01];
980
981 let encoded = encode_account_leaf_value(account, storage_root, &mut account_rlp_buf);
982 let decoded = TrieAccount::decode(&mut &encoded[..]).expect("valid account RLP");
983
984 assert_eq!(decoded.nonce, 7);
985 assert_eq!(decoded.balance, U256::from(42));
986 assert_eq!(decoded.storage_root, storage_root);
987 assert_eq!(account_rlp_buf, encoded);
988 }
989
990 #[test]
991 fn run_returns_parent_root_without_revealing_blind_trie_when_no_state_updates() {
992 let runtime = reth_tasks::Runtime::test();
993 let provider_factory = create_test_provider_factory();
994 let anchor_hash = provider_factory.chain_spec().genesis_hash();
995 let overlay_factory = OverlayStateProviderFactory::new(
996 provider_factory,
997 OverlayBuilder::<reth_chain_state::EthPrimitives>::new(
998 anchor_hash,
999 ChangesetCache::new(),
1000 ),
1001 );
1002 let proof_worker_handle =
1003 ProofWorkerHandle::new(&runtime, ProofTaskCtx::new(overlay_factory), false);
1004
1005 let default_trie = RevealableSparseTrie::blind_from(ArenaParallelSparseTrie::default());
1006 let trie = SparseStateTrie::default()
1007 .with_accounts_trie(default_trie.clone())
1008 .with_default_storage_trie(default_trie)
1009 .with_updates(true);
1010
1011 let parent_state_root = B256::from([0x55; 32]);
1012 let (updates_tx, updates_rx) = crossbeam_channel::unbounded();
1013 let mut task = SparseTrieCacheTask::new_with_trie(
1014 &runtime,
1015 updates_rx,
1016 std::sync::mpsc::channel().0,
1017 proof_worker_handle,
1018 MultiProofTaskMetrics::default(),
1019 trie,
1020 parent_state_root,
1021 1,
1022 );
1023
1024 updates_tx.send(StateRootMessage::FinishedStateUpdates).unwrap();
1025 drop(updates_tx);
1026
1027 let outcome = task.run().expect("state root computation should succeed");
1028
1029 assert_eq!(outcome.state_root, parent_state_root);
1030 assert!(outcome.trie_updates.is_empty());
1031 assert!(task.trie.state_trie_ref().is_none(), "blind trie should not be revealed");
1032 }
1033}