1use std::sync::Arc;
4
5use crate::tree::{
6 multiproof::{
7 dispatch_with_chunking, evm_state_to_hashed_post_state, StateRootComputeOutcome,
8 StateRootMessage, DEFAULT_MAX_TARGETS_FOR_CHUNKING,
9 },
10 payload_processor::multiproof::MultiProofTaskMetrics,
11};
12use alloy_primitives::B256;
13use alloy_rlp::{Decodable, Encodable};
14use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
15use rayon::iter::{IntoParallelIterator, ParallelIterator};
16use reth_primitives_traits::{Account, FastInstant as Instant};
17use reth_tasks::Runtime;
18use reth_trie::{
19 updates::TrieUpdates, DecodedMultiProofV2, HashedPostState, TrieAccount, EMPTY_ROOT_HASH,
20 TRIE_ACCOUNT_RLP_MAX_SIZE,
21};
22use reth_trie_common::{MultiProofTargetsV2, ProofV2Target};
23use reth_trie_parallel::{
24 proof_task::{
25 AccountMultiproofInput, ProofResultContext, ProofResultMessage, ProofWorkerHandle,
26 },
27 root::ParallelStateRootError,
28};
29use reth_trie_sparse::{
30 errors::SparseTrieResult, ConfigurableSparseTrie, DeferredDrops, LeafUpdate,
31 RevealableSparseTrie, SparseStateTrie, SparseTrie,
32};
33use revm_primitives::{hash_map::Entry, B256Map};
34use tracing::{debug, debug_span, error, instrument, trace_span};
35
36const MAX_PENDING_UPDATES: usize = 100;
38
39pub(super) struct SparseTrieCacheTask<A = ConfigurableSparseTrie, S = ConfigurableSparseTrie> {
41 proof_result_tx: CrossbeamSender<ProofResultMessage>,
43 proof_result_rx: CrossbeamReceiver<ProofResultMessage>,
45 updates: CrossbeamReceiver<SparseTrieTaskMessage>,
47 trie: SparseStateTrie<A, S>,
49 proof_worker_handle: ProofWorkerHandle,
51
52 chunk_size: usize,
55 max_targets_for_chunking: usize,
59
60 account_updates: B256Map<LeafUpdate>,
62 storage_updates: B256Map<B256Map<LeafUpdate>>,
64
65 new_account_updates: B256Map<LeafUpdate>,
67 new_storage_updates: B256Map<B256Map<LeafUpdate>>,
69 pending_account_updates: B256Map<Option<Option<Account>>>,
83 fetched_account_targets: B256Map<u8>,
86 fetched_storage_targets: B256Map<B256Map<u8>>,
89 account_rlp_buf: Vec<u8>,
91 finished_state_updates: bool,
93 account_cache_hits: u64,
95 account_cache_misses: u64,
97 storage_cache_hits: u64,
99 storage_cache_misses: u64,
101 pending_targets: PendingTargets,
103 pending_updates: usize,
106
107 metrics: MultiProofTaskMetrics,
109}
110
111impl<A, S> SparseTrieCacheTask<A, S>
112where
113 A: SparseTrie + Default,
114 S: SparseTrie + Default + Clone,
115{
116 pub(super) fn new_with_trie(
118 executor: &Runtime,
119 updates: CrossbeamReceiver<StateRootMessage>,
120 proof_worker_handle: ProofWorkerHandle,
121 metrics: MultiProofTaskMetrics,
122 trie: SparseStateTrie<A, S>,
123 chunk_size: usize,
124 ) -> Self {
125 let (proof_result_tx, proof_result_rx) = crossbeam_channel::unbounded();
126 let (hashed_state_tx, hashed_state_rx) = crossbeam_channel::unbounded();
127
128 let parent_span = tracing::Span::current();
129 let hashing_metrics = metrics.clone();
130 executor.spawn_blocking_named("trie-hashing", move || {
131 let _span = trace_span!(parent: parent_span, "run_hashing_task").entered();
132 Self::run_hashing_task(updates, hashed_state_tx, hashing_metrics)
133 });
134
135 Self {
136 proof_result_tx,
137 proof_result_rx,
138 updates: hashed_state_rx,
139 proof_worker_handle,
140 trie,
141 chunk_size,
142 max_targets_for_chunking: DEFAULT_MAX_TARGETS_FOR_CHUNKING,
143 account_updates: Default::default(),
144 storage_updates: Default::default(),
145 new_account_updates: Default::default(),
146 new_storage_updates: Default::default(),
147 pending_account_updates: Default::default(),
148 fetched_account_targets: Default::default(),
149 fetched_storage_targets: Default::default(),
150 account_rlp_buf: Vec::with_capacity(TRIE_ACCOUNT_RLP_MAX_SIZE),
151 finished_state_updates: Default::default(),
152 account_cache_hits: 0,
153 account_cache_misses: 0,
154 storage_cache_hits: 0,
155 storage_cache_misses: 0,
156 pending_targets: Default::default(),
157 pending_updates: Default::default(),
158 metrics,
159 }
160 }
161
162 fn run_hashing_task(
165 updates: CrossbeamReceiver<StateRootMessage>,
166 hashed_state_tx: CrossbeamSender<SparseTrieTaskMessage>,
167 metrics: MultiProofTaskMetrics,
168 ) {
169 let mut total_idle_time = std::time::Duration::ZERO;
170 let mut idle_start = Instant::now();
171
172 while let Ok(message) = updates.recv() {
173 total_idle_time += idle_start.elapsed();
174
175 let msg = match message {
176 StateRootMessage::PrefetchProofs(targets) => {
177 SparseTrieTaskMessage::PrefetchProofs(targets)
178 }
179 StateRootMessage::StateUpdate(_, state) => {
180 let _span = trace_span!(target: "engine::tree::payload_processor::sparse_trie", "hashing_state_update", n = state.len()).entered();
181 let hashed = evm_state_to_hashed_post_state(state);
182 SparseTrieTaskMessage::HashedState(hashed)
183 }
184 StateRootMessage::FinishedStateUpdates => {
185 SparseTrieTaskMessage::FinishedStateUpdates
186 }
187 StateRootMessage::BlockAccessList(_) => {
188 idle_start = Instant::now();
189 continue;
190 }
191 StateRootMessage::HashedStateUpdate(state) => {
192 SparseTrieTaskMessage::HashedState(state)
193 }
194 };
195 if hashed_state_tx.send(msg).is_err() {
196 break;
197 }
198
199 idle_start = Instant::now();
200 }
201
202 metrics.hashing_task_idle_time_seconds.record(total_idle_time.as_secs_f64());
203 }
204
205 pub(super) fn into_trie_for_reuse(
213 self,
214 max_hot_slots: usize,
215 max_hot_accounts: usize,
216 max_nodes_capacity: usize,
217 max_values_capacity: usize,
218 disable_pruning: bool,
219 updates: &TrieUpdates,
220 ) -> (SparseStateTrie<A, S>, DeferredDrops) {
221 let Self { mut trie, .. } = self;
222 trie.commit_updates(updates);
223 if !disable_pruning {
224 trie.prune(max_hot_slots, max_hot_accounts);
225 trie.shrink_to(max_nodes_capacity, max_values_capacity);
226 }
227 let deferred = trie.take_deferred_drops();
228 (trie, deferred)
229 }
230
231 pub(super) fn into_cleared_trie(
236 self,
237 max_nodes_capacity: usize,
238 max_values_capacity: usize,
239 ) -> (SparseStateTrie<A, S>, DeferredDrops) {
240 let Self { mut trie, .. } = self;
241 trie.clear();
242 trie.shrink_to(max_nodes_capacity, max_values_capacity);
243 let deferred = trie.take_deferred_drops();
244 (trie, deferred)
245 }
246
247 #[instrument(
254 name = "SparseTrieCacheTask::run",
255 level = "debug",
256 target = "engine::tree::payload_processor::sparse_trie",
257 skip_all
258 )]
259 pub(super) fn run(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
260 let now = Instant::now();
261
262 let mut total_idle_time = std::time::Duration::ZERO;
263 let mut idle_start = Instant::now();
264
265 loop {
266 let mut t = Instant::now();
267 crossbeam_channel::select_biased! {
268 recv(self.updates) -> message => {
269 let wake = Instant::now();
270
271 let update = match message {
272 Ok(m) => m,
273 Err(_) => {
274 return Err(ParallelStateRootError::Other(
275 "updates channel disconnected before state root calculation".to_string(),
276 ))
277 }
278 };
279
280 total_idle_time += wake.duration_since(idle_start);
281 self.metrics
282 .sparse_trie_channel_wait_duration_histogram
283 .record(wake.duration_since(t));
284
285 self.on_message(update);
286 self.pending_updates += 1;
287 }
288 recv(self.proof_result_rx) -> message => {
289 let phase_end = Instant::now();
290 total_idle_time += phase_end.duration_since(idle_start);
291 self.metrics
292 .sparse_trie_channel_wait_duration_histogram
293 .record(phase_end.duration_since(t));
294 t = phase_end;
295
296 let Ok(result) = message else {
297 unreachable!("we own the sender half")
298 };
299
300 let mut result = result.result?;
301 while let Ok(next) = self.proof_result_rx.try_recv() {
302 let res = next.result?;
303 result.extend(res);
304 }
305
306 let phase_end = Instant::now();
307 self.metrics
308 .sparse_trie_proof_coalesce_duration_histogram
309 .record(phase_end.duration_since(t));
310 t = phase_end;
311
312 self.on_proof_result(result)?;
313 self.metrics
314 .sparse_trie_reveal_multiproof_duration_histogram
315 .record(t.elapsed());
316 },
317 }
318
319 if self.updates.is_empty() && self.proof_result_rx.is_empty() {
320 self.dispatch_pending_targets();
323 t = Instant::now();
324 self.process_new_updates()?;
325 self.promote_pending_account_updates()?;
326 self.metrics.sparse_trie_process_updates_duration_histogram.record(t.elapsed());
327
328 if self.finished_state_updates &&
329 self.account_updates.is_empty() &&
330 self.storage_updates.iter().all(|(_, updates)| updates.is_empty())
331 {
332 break;
333 }
334
335 self.dispatch_pending_targets();
336
337 if self.proof_result_rx.is_empty() {
340 self.trie.calculate_subtries();
341 }
342 } else if self.updates.is_empty() || self.pending_updates > MAX_PENDING_UPDATES {
343 t = Instant::now();
346 self.process_new_updates()?;
347 self.metrics.sparse_trie_process_updates_duration_histogram.record(t.elapsed());
348 self.dispatch_pending_targets();
349 } else if self.pending_targets.len() > self.chunk_size {
350 self.dispatch_pending_targets();
352 }
353
354 idle_start = Instant::now();
355 }
356
357 self.metrics.sparse_trie_idle_time_seconds.record(total_idle_time.as_secs_f64());
358
359 debug!(target: "engine::root", "All proofs processed, ending calculation");
360
361 let start = Instant::now();
362 let (state_root, trie_updates) =
363 self.trie.root_with_updates(&self.proof_worker_handle).map_err(|e| {
364 ParallelStateRootError::Other(format!("could not calculate state root: {e:?}"))
365 })?;
366
367 #[cfg(feature = "trie-debug")]
368 let debug_recorders = self.trie.take_debug_recorders();
369
370 let end = Instant::now();
371 self.metrics.sparse_trie_final_update_duration_histogram.record(end.duration_since(start));
372 self.metrics.sparse_trie_total_duration_histogram.record(end.duration_since(now));
373
374 self.metrics.sparse_trie_account_cache_hits.record(self.account_cache_hits as f64);
375 self.metrics.sparse_trie_account_cache_misses.record(self.account_cache_misses as f64);
376 self.metrics.sparse_trie_storage_cache_hits.record(self.storage_cache_hits as f64);
377 self.metrics.sparse_trie_storage_cache_misses.record(self.storage_cache_misses as f64);
378 self.account_cache_hits = 0;
379 self.account_cache_misses = 0;
380 self.storage_cache_hits = 0;
381 self.storage_cache_misses = 0;
382
383 Ok(StateRootComputeOutcome {
384 state_root,
385 trie_updates: Arc::new(trie_updates),
386 #[cfg(feature = "trie-debug")]
387 debug_recorders,
388 })
389 }
390
391 fn on_message(&mut self, message: SparseTrieTaskMessage) {
393 match message {
394 SparseTrieTaskMessage::PrefetchProofs(targets) => self.on_prewarm_targets(targets),
395 SparseTrieTaskMessage::HashedState(hashed_state) => {
396 self.on_hashed_state_update(hashed_state)
397 }
398 SparseTrieTaskMessage::FinishedStateUpdates => self.finished_state_updates = true,
399 }
400 }
401
402 #[instrument(
403 level = "trace",
404 target = "engine::tree::payload_processor::sparse_trie",
405 skip_all
406 )]
407 fn on_prewarm_targets(&mut self, targets: MultiProofTargetsV2) {
408 for target in targets.account_targets {
409 self.new_account_updates.entry(target.key()).or_insert(LeafUpdate::Touched);
411 }
412
413 for (address, slots) in targets.storage_targets {
414 if !slots.is_empty() {
415 let new_updates = self.new_storage_updates.entry(address).or_default();
417 for slot in slots {
418 new_updates.entry(slot.key()).or_insert(LeafUpdate::Touched);
420 }
421 }
422
423 self.new_account_updates.entry(address).or_insert(LeafUpdate::Touched);
426 }
427 }
428
429 #[instrument(
431 level = "trace",
432 target = "engine::tree::payload_processor::sparse_trie",
433 skip_all
434 )]
435 fn on_hashed_state_update(&mut self, hashed_state_update: HashedPostState) {
436 for (address, storage) in hashed_state_update.storages {
437 if !storage.storage.is_empty() {
438 let new_updates = self.new_storage_updates.entry(address).or_default();
440 let mut existing_updates = self.storage_updates.get_mut(&address);
441
442 for (slot, value) in storage.storage {
443 self.trie.record_slot_touch(address, slot);
444
445 let encoded = if value.is_zero() {
446 Vec::new()
447 } else {
448 alloy_rlp::encode_fixed_size(&value).to_vec()
449 };
450 new_updates.insert(slot, LeafUpdate::Changed(encoded));
451
452 if let Some(ref mut existing) = existing_updates {
454 existing.remove(&slot);
455 }
456 }
457 }
458
459 self.new_account_updates.entry(address).or_insert(LeafUpdate::Touched);
462
463 self.pending_account_updates.entry(address).or_insert(None);
466 }
467
468 for (address, account) in hashed_state_update.accounts {
469 self.trie.record_account_touch(address);
470
471 self.new_account_updates.insert(address, LeafUpdate::Touched);
476
477 self.pending_account_updates.insert(address, Some(account));
480 }
481 }
482
483 fn on_proof_result(
484 &mut self,
485 result: DecodedMultiProofV2,
486 ) -> Result<(), ParallelStateRootError> {
487 self.trie.reveal_decoded_multiproof_v2(result).map_err(|e| {
488 ParallelStateRootError::Other(format!("could not reveal multiproof: {e:?}"))
489 })
490 }
491
492 fn process_new_updates(&mut self) -> SparseTrieResult<()> {
493 if self.pending_updates == 0 {
494 return Ok(());
495 }
496
497 let _span = debug_span!("process_new_updates").entered();
498 self.pending_updates = 0;
499
500 self.process_leaf_updates(true)?;
502
503 for (address, mut new) in self.new_storage_updates.drain() {
504 match self.storage_updates.entry(address) {
505 Entry::Vacant(entry) => {
506 entry.insert(new); }
508 Entry::Occupied(mut entry) => {
509 let updates = entry.get_mut();
510 for (slot, new) in new.drain() {
511 match updates.entry(slot) {
512 Entry::Occupied(mut slot_entry) => {
513 if new.is_changed() {
514 slot_entry.insert(new);
515 }
516 }
517 Entry::Vacant(slot_entry) => {
518 slot_entry.insert(new);
519 }
520 }
521 }
522 }
523 }
524 }
525
526 for (address, new) in self.new_account_updates.drain() {
527 match self.account_updates.entry(address) {
528 Entry::Occupied(mut entry) => {
529 if new.is_changed() {
530 entry.insert(new);
531 }
532 }
533 Entry::Vacant(entry) => {
534 entry.insert(new);
535 }
536 }
537 }
538
539 Ok(())
540 }
541
542 #[instrument(
545 level = "trace",
546 target = "engine::tree::payload_processor::sparse_trie",
547 skip_all
548 )]
549 fn process_leaf_updates(&mut self, new: bool) -> SparseTrieResult<()> {
550 let storage_updates =
551 if new { &mut self.new_storage_updates } else { &mut self.storage_updates };
552
553 let span = trace_span!("process_storage_leaf_updates").entered();
555 for (address, updates) in storage_updates {
556 if updates.is_empty() {
557 continue;
558 }
559 let _enter = trace_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage_trie_leaf_updates", a=%address).entered();
560
561 let trie = self.trie.get_or_create_storage_trie_mut(*address);
562 let fetched = self.fetched_storage_targets.entry(*address).or_default();
563 let mut targets = Vec::new();
564
565 let updates_len_before = updates.len();
566 trie.update_leaves(updates, |path, min_len| match fetched.entry(path) {
567 Entry::Occupied(mut entry) => {
568 if min_len < *entry.get() {
569 entry.insert(min_len);
570 targets.push(ProofV2Target::new(path).with_min_len(min_len));
571 }
572 }
573 Entry::Vacant(entry) => {
574 entry.insert(min_len);
575 targets.push(ProofV2Target::new(path).with_min_len(min_len));
576 }
577 })?;
578 let updates_len_after = updates.len();
579 self.storage_cache_hits += (updates_len_before - updates_len_after) as u64;
580 self.storage_cache_misses += updates_len_after as u64;
581
582 if !targets.is_empty() {
583 self.pending_targets.extend_storage_targets(address, targets);
584 }
585 }
586
587 drop(span);
588
589 self.process_account_leaf_updates(new)?;
591
592 Ok(())
593 }
594
595 #[instrument(
599 level = "trace",
600 target = "engine::tree::payload_processor::sparse_trie",
601 skip_all
602 )]
603 fn process_account_leaf_updates(&mut self, new: bool) -> SparseTrieResult<bool> {
604 let account_updates =
605 if new { &mut self.new_account_updates } else { &mut self.account_updates };
606
607 let updates_len_before = account_updates.len();
608
609 self.trie.trie_mut().update_leaves(account_updates, |target, min_len| {
610 match self.fetched_account_targets.entry(target) {
611 Entry::Occupied(mut entry) => {
612 if min_len < *entry.get() {
613 entry.insert(min_len);
614 self.pending_targets
615 .push_account_target(ProofV2Target::new(target).with_min_len(min_len));
616 }
617 }
618 Entry::Vacant(entry) => {
619 entry.insert(min_len);
620 self.pending_targets
621 .push_account_target(ProofV2Target::new(target).with_min_len(min_len));
622 }
623 }
624 })?;
625
626 let updates_len_after = account_updates.len();
627 self.account_cache_hits += (updates_len_before - updates_len_after) as u64;
628 self.account_cache_misses += updates_len_after as u64;
629
630 Ok(updates_len_after < updates_len_before)
631 }
632
633 fn compute_drained_storage_roots(&mut self) {
642 let addresses_to_compute_roots: Vec<_> = self
643 .storage_updates
644 .iter()
645 .filter_map(|(address, updates)| updates.is_empty().then_some(*address))
646 .collect();
647
648 struct SendStorageTriePtr<S>(*mut RevealableSparseTrie<S>);
649 unsafe impl<S: Send> Send for SendStorageTriePtr<S> {}
652
653 let mut tries_to_compute_roots: Vec<(B256, SendStorageTriePtr<S>)> =
654 Vec::with_capacity(addresses_to_compute_roots.len());
655 for address in addresses_to_compute_roots {
656 if let Some(trie) = self.trie.storage_tries_mut().get_mut(&address) &&
657 !trie.is_root_cached()
658 {
659 tries_to_compute_roots.push((address, SendStorageTriePtr(trie)));
660 }
661 }
662
663 if tries_to_compute_roots.is_empty() {
664 return;
665 }
666
667 let parent_span =
668 debug_span!("compute_drained_storage_roots", n = tries_to_compute_roots.len());
669 tries_to_compute_roots.into_par_iter().for_each(|(address, SendStorageTriePtr(trie))| {
670 let span = if tracing::enabled!(tracing::Level::TRACE) {
671 debug_span!(
672 target: "engine::tree::payload_processor::sparse_trie",
673 parent: &parent_span,
674 "storage_root",
675 ?address
676 )
677 } else {
678 debug_span!(
679 target: "engine::tree::payload_processor::sparse_trie",
680 parent: &parent_span,
681 "storage_root",
682 )
683 };
684 let _enter = span.entered();
685 unsafe { (*trie).root().expect("updates are drained, trie should be revealed by now") };
692 });
693 }
694
695 #[instrument(
699 level = "trace",
700 target = "engine::tree::payload_processor::sparse_trie",
701 skip_all
702 )]
703 fn promote_pending_account_updates(&mut self) -> SparseTrieResult<()> {
704 self.process_leaf_updates(false)?;
705
706 if self.pending_account_updates.is_empty() {
707 return Ok(());
708 }
709
710 self.compute_drained_storage_roots();
711
712 loop {
713 let span = trace_span!("promote_updates", promoted = tracing::field::Empty).entered();
714 let account_rlp_buf = &mut self.account_rlp_buf;
716 let mut num_promoted = 0;
717 self.pending_account_updates.retain(|addr, account| {
718 if let Some(updates) = self.storage_updates.get(addr) {
719 if !updates.is_empty() {
720 return true;
722 } else if let Some(account) = account.take() {
723 let storage_root = self.trie.storage_root(addr).expect("updates are drained, storage trie should be revealed by now");
724 let encoded = encode_account_leaf_value(account, storage_root, account_rlp_buf);
725 self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
726 num_promoted += 1;
727 return false;
728 }
729 }
730
731 let trie_account = match self.account_updates.get(addr) {
733 Some(LeafUpdate::Changed(encoded)) => {
734 Some(encoded).filter(|encoded| !encoded.is_empty())
735 }
736 Some(LeafUpdate::Touched) => return true,
738 None => self.trie.get_account_value(addr),
739 };
740
741 let trie_account = trie_account.map(|value| TrieAccount::decode(&mut &value[..]).expect("invalid account RLP"));
742
743 let (account, storage_root) = if let Some(account) = account.take() {
744 let storage_root = trie_account.map(|account| account.storage_root).unwrap_or(EMPTY_ROOT_HASH);
749
750 (account, storage_root)
751 } else {
752 (trie_account.map(Into::into), self.trie.storage_root(addr).expect("account had storage updates that were applied to its trie, storage root must be revealed by now"))
753 };
754
755 let encoded = encode_account_leaf_value(account, storage_root, account_rlp_buf);
756 self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
757 num_promoted += 1;
758
759 false
760 });
761 span.record("promoted", num_promoted);
762 drop(span);
763
764 if num_promoted == 0 || !self.process_account_leaf_updates(false)? {
769 break
770 }
771 }
772
773 Ok(())
774 }
775
776 fn dispatch_pending_targets(&mut self) {
777 if self.pending_targets.is_empty() {
778 return;
779 }
780
781 let _span = trace_span!("dispatch_pending_targets").entered();
782 let (targets, chunking_length) = self.pending_targets.take();
783 dispatch_with_chunking(
784 targets,
785 chunking_length,
786 self.chunk_size,
787 self.max_targets_for_chunking,
788 self.proof_worker_handle.has_multiple_idle_account_workers(),
789 self.proof_worker_handle.has_multiple_idle_storage_workers(),
790 MultiProofTargetsV2::chunks,
791 |proof_targets| {
792 if let Err(e) =
793 self.proof_worker_handle.dispatch_account_multiproof(AccountMultiproofInput {
794 targets: proof_targets,
795 proof_result_sender: ProofResultContext::new(
796 self.proof_result_tx.clone(),
797 HashedPostState::default(),
798 Instant::now(),
799 ),
800 })
801 {
802 error!("failed to dispatch account multiproof: {e:?}");
803 }
804 },
805 );
806 }
807}
808
809fn encode_account_leaf_value(
811 account: Option<Account>,
812 storage_root: B256,
813 account_rlp_buf: &mut Vec<u8>,
814) -> Vec<u8> {
815 if account.is_none_or(|account| account.is_empty()) && storage_root == EMPTY_ROOT_HASH {
816 return Vec::new();
817 }
818
819 account_rlp_buf.clear();
820 account.unwrap_or_default().into_trie_account(storage_root).encode(account_rlp_buf);
821 account_rlp_buf.clone()
822}
823
824#[derive(Default)]
826struct PendingTargets {
827 targets: MultiProofTargetsV2,
829 len: usize,
831}
832
833impl PendingTargets {
834 const fn len(&self) -> usize {
836 self.len
837 }
838
839 const fn is_empty(&self) -> bool {
841 self.len == 0
842 }
843
844 fn take(&mut self) -> (MultiProofTargetsV2, usize) {
846 (std::mem::take(&mut self.targets), std::mem::take(&mut self.len))
847 }
848
849 fn push_account_target(&mut self, target: ProofV2Target) {
851 self.targets.account_targets.push(target);
852 self.len += 1;
853 }
854
855 fn extend_storage_targets(&mut self, address: &B256, targets: Vec<ProofV2Target>) {
857 self.len += targets.len();
858 self.targets.storage_targets.entry(*address).or_default().extend(targets);
859 }
860}
861
862enum SparseTrieTaskMessage {
864 HashedState(HashedPostState),
866 PrefetchProofs(MultiProofTargetsV2),
868 FinishedStateUpdates,
870}
871
872#[cfg(test)]
873mod tests {
874 use super::*;
875 use alloy_primitives::{keccak256, Address, B256, U256};
876 use reth_trie_sparse::ArenaParallelSparseTrie;
877
878 #[test]
879 fn test_run_hashing_task_hashed_state_update_forwards() {
880 let (updates_tx, updates_rx) = crossbeam_channel::unbounded();
881 let (hashed_state_tx, hashed_state_rx) = crossbeam_channel::unbounded();
882
883 let address = keccak256(Address::random());
884 let slot = keccak256(U256::from(42).to_be_bytes::<32>());
885 let value = U256::from(999);
886
887 let mut hashed_state = HashedPostState::default();
888 hashed_state.accounts.insert(
889 address,
890 Some(Account { balance: U256::from(100), nonce: 1, bytecode_hash: None }),
891 );
892 let mut storage = reth_trie::HashedStorage::new(false);
893 storage.storage.insert(slot, value);
894 hashed_state.storages.insert(address, storage);
895
896 let expected_state = hashed_state.clone();
897
898 let handle = std::thread::spawn(move || {
899 SparseTrieCacheTask::<ArenaParallelSparseTrie, ArenaParallelSparseTrie>::run_hashing_task(
900 updates_rx,
901 hashed_state_tx,
902 MultiProofTaskMetrics::default(),
903 );
904 });
905
906 updates_tx.send(StateRootMessage::HashedStateUpdate(hashed_state)).unwrap();
907 updates_tx.send(StateRootMessage::FinishedStateUpdates).unwrap();
908 drop(updates_tx);
909
910 let SparseTrieTaskMessage::HashedState(received) = hashed_state_rx.recv().unwrap() else {
911 panic!("expected HashedState message");
912 };
913
914 let account = received.accounts.get(&address).unwrap().unwrap();
915 assert_eq!(account.balance, expected_state.accounts[&address].unwrap().balance);
916 assert_eq!(account.nonce, expected_state.accounts[&address].unwrap().nonce);
917
918 let storage = received.storages.get(&address).unwrap();
919 assert_eq!(*storage.storage.get(&slot).unwrap(), value);
920
921 let second = hashed_state_rx.recv().unwrap();
922 assert!(matches!(second, SparseTrieTaskMessage::FinishedStateUpdates));
923
924 assert!(hashed_state_rx.recv().is_err());
925 handle.join().unwrap();
926 }
927
928 #[test]
929 fn test_encode_account_leaf_value_empty_account_and_empty_root_is_empty() {
930 let mut account_rlp_buf = vec![0xAB];
931 let encoded = encode_account_leaf_value(None, EMPTY_ROOT_HASH, &mut account_rlp_buf);
932
933 assert!(encoded.is_empty());
934 assert_eq!(account_rlp_buf, vec![0xAB]);
936 }
937
938 #[test]
939 fn test_encode_account_leaf_value_non_empty_account_is_rlp() {
940 let storage_root = B256::from([0x99; 32]);
941 let account = Some(Account {
942 nonce: 7,
943 balance: U256::from(42),
944 bytecode_hash: Some(B256::from([0xAA; 32])),
945 });
946 let mut account_rlp_buf = vec![0x00, 0x01];
947
948 let encoded = encode_account_leaf_value(account, storage_root, &mut account_rlp_buf);
949 let decoded = TrieAccount::decode(&mut &encoded[..]).expect("valid account RLP");
950
951 assert_eq!(decoded.nonce, 7);
952 assert_eq!(decoded.balance, U256::from(42));
953 assert_eq!(decoded.storage_root, storage_root);
954 assert_eq!(account_rlp_buf, encoded);
955 }
956}