1use std::sync::Arc;
4
5use crate::tree::{
6 multiproof::{
7 dispatch_with_chunking, evm_state_to_hashed_post_state, MultiProofMessage,
8 DEFAULT_MAX_TARGETS_FOR_CHUNKING,
9 },
10 payload_processor::multiproof::MultiProofTaskMetrics,
11};
12use alloy_primitives::B256;
13use alloy_rlp::{Decodable, Encodable};
14use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
15use rayon::iter::ParallelIterator;
16use reth_primitives_traits::{Account, FastInstant as Instant, ParallelBridgeBuffered};
17use reth_tasks::Runtime;
18use reth_trie::{
19 updates::TrieUpdates, DecodedMultiProofV2, HashedPostState, TrieAccount, EMPTY_ROOT_HASH,
20 TRIE_ACCOUNT_RLP_MAX_SIZE,
21};
22use reth_trie_common::{MultiProofTargetsV2, ProofV2Target};
23use reth_trie_parallel::{
24 proof_task::{
25 AccountMultiproofInput, ProofResultContext, ProofResultMessage, ProofWorkerHandle,
26 },
27 root::ParallelStateRootError,
28};
29#[cfg(feature = "trie-debug")]
30use reth_trie_sparse::debug_recorder::TrieDebugRecorder;
31use reth_trie_sparse::{
32 errors::SparseTrieResult, DeferredDrops, LeafUpdate, ParallelSparseTrie, SparseStateTrie,
33 SparseTrie,
34};
35use revm_primitives::{hash_map::Entry, B256Map};
36use tracing::{debug, debug_span, error, instrument, trace_span};
37
38const MAX_PENDING_UPDATES: usize = 100;
40
41pub(super) struct SparseTrieCacheTask<A = ParallelSparseTrie, S = ParallelSparseTrie> {
43 proof_result_tx: CrossbeamSender<ProofResultMessage>,
45 proof_result_rx: CrossbeamReceiver<ProofResultMessage>,
47 updates: CrossbeamReceiver<SparseTrieTaskMessage>,
49 trie: SparseStateTrie<A, S>,
51 proof_worker_handle: ProofWorkerHandle,
53
54 chunk_size: usize,
57 max_targets_for_chunking: usize,
61
62 account_updates: B256Map<LeafUpdate>,
64 storage_updates: B256Map<B256Map<LeafUpdate>>,
66
67 new_account_updates: B256Map<LeafUpdate>,
69 new_storage_updates: B256Map<B256Map<LeafUpdate>>,
71 pending_account_updates: B256Map<Option<Option<Account>>>,
85 fetched_account_targets: B256Map<u8>,
88 fetched_storage_targets: B256Map<B256Map<u8>>,
91 account_rlp_buf: Vec<u8>,
93 finished_state_updates: bool,
95 account_cache_hits: u64,
97 account_cache_misses: u64,
99 storage_cache_hits: u64,
101 storage_cache_misses: u64,
103 pending_targets: PendingTargets,
105 pending_updates: usize,
108
109 metrics: MultiProofTaskMetrics,
111}
112
113impl<A, S> SparseTrieCacheTask<A, S>
114where
115 A: SparseTrie + Default,
116 S: SparseTrie + Default + Clone,
117{
118 pub(super) fn new_with_trie(
120 executor: &Runtime,
121 updates: CrossbeamReceiver<MultiProofMessage>,
122 proof_worker_handle: ProofWorkerHandle,
123 metrics: MultiProofTaskMetrics,
124 trie: SparseStateTrie<A, S>,
125 chunk_size: usize,
126 ) -> Self {
127 let (proof_result_tx, proof_result_rx) = crossbeam_channel::unbounded();
128 let (hashed_state_tx, hashed_state_rx) = crossbeam_channel::unbounded();
129
130 let parent_span = tracing::Span::current();
131 executor.spawn_blocking_named("trie-hashing", move || {
132 let _span = debug_span!(parent: parent_span, "run_hashing_task").entered();
133 Self::run_hashing_task(updates, hashed_state_tx)
134 });
135
136 Self {
137 proof_result_tx,
138 proof_result_rx,
139 updates: hashed_state_rx,
140 proof_worker_handle,
141 trie,
142 chunk_size,
143 max_targets_for_chunking: DEFAULT_MAX_TARGETS_FOR_CHUNKING,
144 account_updates: Default::default(),
145 storage_updates: Default::default(),
146 new_account_updates: Default::default(),
147 new_storage_updates: Default::default(),
148 pending_account_updates: Default::default(),
149 fetched_account_targets: Default::default(),
150 fetched_storage_targets: Default::default(),
151 account_rlp_buf: Vec::with_capacity(TRIE_ACCOUNT_RLP_MAX_SIZE),
152 finished_state_updates: Default::default(),
153 account_cache_hits: 0,
154 account_cache_misses: 0,
155 storage_cache_hits: 0,
156 storage_cache_misses: 0,
157 pending_targets: Default::default(),
158 pending_updates: Default::default(),
159 metrics,
160 }
161 }
162
163 fn run_hashing_task(
166 updates: CrossbeamReceiver<MultiProofMessage>,
167 hashed_state_tx: CrossbeamSender<SparseTrieTaskMessage>,
168 ) {
169 while let Ok(message) = updates.recv() {
170 let msg = match message {
171 MultiProofMessage::PrefetchProofs(targets) => {
172 SparseTrieTaskMessage::PrefetchProofs(targets)
173 }
174 MultiProofMessage::StateUpdate(_, state) => {
175 let _span = debug_span!(target: "engine::tree::payload_processor::sparse_trie", "hashing_state_update", n = state.len()).entered();
176 let hashed = evm_state_to_hashed_post_state(state);
177 SparseTrieTaskMessage::HashedState(hashed)
178 }
179 MultiProofMessage::FinishedStateUpdates => {
180 SparseTrieTaskMessage::FinishedStateUpdates
181 }
182 MultiProofMessage::EmptyProof { .. } | MultiProofMessage::BlockAccessList(_) => {
183 continue
184 }
185 MultiProofMessage::HashedStateUpdate(state) => {
186 SparseTrieTaskMessage::HashedState(state)
187 }
188 };
189 if hashed_state_tx.send(msg).is_err() {
190 break;
191 }
192 }
193 }
194
195 pub(super) fn into_trie_for_reuse(
203 self,
204 prune_depth: usize,
205 max_storage_tries: usize,
206 max_nodes_capacity: usize,
207 max_values_capacity: usize,
208 disable_pruning: bool,
209 updates: &TrieUpdates,
210 ) -> (SparseStateTrie<A, S>, DeferredDrops) {
211 let Self { mut trie, .. } = self;
212 trie.commit_updates(updates);
213 if !disable_pruning {
214 trie.prune(prune_depth, max_storage_tries);
215 trie.shrink_to(max_nodes_capacity, max_values_capacity);
216 }
217 let deferred = trie.take_deferred_drops();
218 (trie, deferred)
219 }
220
221 pub(super) fn into_cleared_trie(
226 self,
227 max_nodes_capacity: usize,
228 max_values_capacity: usize,
229 ) -> (SparseStateTrie<A, S>, DeferredDrops) {
230 let Self { mut trie, .. } = self;
231 trie.clear();
232 trie.shrink_to(max_nodes_capacity, max_values_capacity);
233 let deferred = trie.take_deferred_drops();
234 (trie, deferred)
235 }
236
237 #[instrument(
244 name = "SparseTrieCacheTask::run",
245 level = "debug",
246 target = "engine::tree::payload_processor::sparse_trie",
247 skip_all
248 )]
249 pub(super) fn run(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
250 let now = Instant::now();
251
252 loop {
253 let mut t = Instant::now();
254 crossbeam_channel::select_biased! {
255 recv(self.updates) -> message => {
256 self.metrics
257 .sparse_trie_channel_wait_duration_histogram
258 .record(t.elapsed());
259
260 let update = match message {
261 Ok(m) => m,
262 Err(_) => {
263 return Err(ParallelStateRootError::Other(
264 "updates channel disconnected before state root calculation".to_string(),
265 ))
266 }
267 };
268
269 self.on_message(update);
270 self.pending_updates += 1;
271 }
272 recv(self.proof_result_rx) -> message => {
273 let phase_end = Instant::now();
274 self.metrics
275 .sparse_trie_channel_wait_duration_histogram
276 .record(phase_end.duration_since(t));
277 t = phase_end;
278
279 let Ok(result) = message else {
280 unreachable!("we own the sender half")
281 };
282
283 let mut result = result.result?;
284 while let Ok(next) = self.proof_result_rx.try_recv() {
285 let res = next.result?;
286 result.extend(res);
287 }
288
289 let phase_end = Instant::now();
290 self.metrics
291 .sparse_trie_proof_coalesce_duration_histogram
292 .record(phase_end.duration_since(t));
293 t = phase_end;
294
295 self.on_proof_result(result)?;
296 self.metrics
297 .sparse_trie_reveal_multiproof_duration_histogram
298 .record(t.elapsed());
299 },
300 }
301
302 if self.updates.is_empty() && self.proof_result_rx.is_empty() {
303 self.dispatch_pending_targets();
306 t = Instant::now();
307 self.process_new_updates()?;
308 self.promote_pending_account_updates()?;
309 self.metrics.sparse_trie_process_updates_duration_histogram.record(t.elapsed());
310
311 if self.finished_state_updates &&
312 self.account_updates.is_empty() &&
313 self.storage_updates.iter().all(|(_, updates)| updates.is_empty())
314 {
315 break;
316 }
317
318 self.dispatch_pending_targets();
319 } else if self.updates.is_empty() || self.pending_updates > MAX_PENDING_UPDATES {
320 t = Instant::now();
323 self.process_new_updates()?;
324 self.metrics.sparse_trie_process_updates_duration_histogram.record(t.elapsed());
325 self.dispatch_pending_targets();
326 } else if self.pending_targets.len() > self.chunk_size {
327 self.dispatch_pending_targets();
329 }
330 }
331
332 debug!(target: "engine::root", "All proofs processed, ending calculation");
333
334 let start = Instant::now();
335 let (state_root, trie_updates) =
336 self.trie.root_with_updates(&self.proof_worker_handle).map_err(|e| {
337 ParallelStateRootError::Other(format!("could not calculate state root: {e:?}"))
338 })?;
339
340 #[cfg(feature = "trie-debug")]
341 let debug_recorders = self.trie.take_debug_recorders();
342
343 let end = Instant::now();
344 self.metrics.sparse_trie_final_update_duration_histogram.record(end.duration_since(start));
345 self.metrics.sparse_trie_total_duration_histogram.record(end.duration_since(now));
346
347 self.metrics.sparse_trie_account_cache_hits.record(self.account_cache_hits as f64);
348 self.metrics.sparse_trie_account_cache_misses.record(self.account_cache_misses as f64);
349 self.metrics.sparse_trie_storage_cache_hits.record(self.storage_cache_hits as f64);
350 self.metrics.sparse_trie_storage_cache_misses.record(self.storage_cache_misses as f64);
351 self.account_cache_hits = 0;
352 self.account_cache_misses = 0;
353 self.storage_cache_hits = 0;
354 self.storage_cache_misses = 0;
355
356 Ok(StateRootComputeOutcome {
357 state_root,
358 trie_updates: Arc::new(trie_updates),
359 #[cfg(feature = "trie-debug")]
360 debug_recorders,
361 })
362 }
363
364 fn on_message(&mut self, message: SparseTrieTaskMessage) {
366 match message {
367 SparseTrieTaskMessage::PrefetchProofs(targets) => self.on_prewarm_targets(targets),
368 SparseTrieTaskMessage::HashedState(hashed_state) => {
369 self.on_hashed_state_update(hashed_state)
370 }
371 SparseTrieTaskMessage::FinishedStateUpdates => self.finished_state_updates = true,
372 }
373 }
374
375 #[instrument(
376 level = "trace",
377 target = "engine::tree::payload_processor::sparse_trie",
378 skip_all
379 )]
380 fn on_prewarm_targets(&mut self, targets: MultiProofTargetsV2) {
381 for target in targets.account_targets {
382 self.new_account_updates.entry(target.key()).or_insert(LeafUpdate::Touched);
384 }
385
386 for (address, slots) in targets.storage_targets {
387 for slot in slots {
388 self.new_storage_updates
390 .entry(address)
391 .or_default()
392 .entry(slot.key())
393 .or_insert(LeafUpdate::Touched);
394 }
395
396 self.new_account_updates.entry(address).or_insert(LeafUpdate::Touched);
399 }
400 }
401
402 #[instrument(
404 level = "trace",
405 target = "engine::tree::payload_processor::sparse_trie",
406 skip_all
407 )]
408 fn on_hashed_state_update(&mut self, hashed_state_update: HashedPostState) {
409 for (address, storage) in hashed_state_update.storages {
410 for (slot, value) in storage.storage {
411 let encoded = if value.is_zero() {
412 Vec::new()
413 } else {
414 alloy_rlp::encode_fixed_size(&value).to_vec()
415 };
416 self.new_storage_updates
417 .entry(address)
418 .or_default()
419 .insert(slot, LeafUpdate::Changed(encoded));
420
421 self.storage_updates.get_mut(&address).and_then(|updates| updates.remove(&slot));
423 }
424
425 self.new_account_updates.entry(address).or_insert(LeafUpdate::Touched);
428
429 self.pending_account_updates.entry(address).or_insert(None);
432 }
433
434 for (address, account) in hashed_state_update.accounts {
435 self.new_account_updates.insert(address, LeafUpdate::Touched);
440
441 self.pending_account_updates.insert(address, Some(account));
444 }
445 }
446
447 fn on_proof_result(
448 &mut self,
449 result: DecodedMultiProofV2,
450 ) -> Result<(), ParallelStateRootError> {
451 self.trie.reveal_decoded_multiproof_v2(result).map_err(|e| {
452 ParallelStateRootError::Other(format!("could not reveal multiproof: {e:?}"))
453 })
454 }
455
456 fn process_new_updates(&mut self) -> SparseTrieResult<()> {
457 if self.pending_updates == 0 {
458 return Ok(());
459 }
460
461 let _span = debug_span!("process_new_updates").entered();
462 self.pending_updates = 0;
463
464 self.process_leaf_updates(true)?;
466
467 for (address, mut new) in self.new_storage_updates.drain() {
468 match self.storage_updates.entry(address) {
469 Entry::Vacant(entry) => {
470 entry.insert(new); }
472 Entry::Occupied(mut entry) => {
473 let updates = entry.get_mut();
474 for (slot, new) in new.drain() {
475 match updates.entry(slot) {
476 Entry::Occupied(mut slot_entry) => {
477 if new.is_changed() {
478 slot_entry.insert(new);
479 }
480 }
481 Entry::Vacant(slot_entry) => {
482 slot_entry.insert(new);
483 }
484 }
485 }
486 }
487 }
488 }
489
490 for (address, new) in self.new_account_updates.drain() {
491 match self.account_updates.entry(address) {
492 Entry::Occupied(mut entry) => {
493 if new.is_changed() {
494 entry.insert(new);
495 }
496 }
497 Entry::Vacant(entry) => {
498 entry.insert(new);
499 }
500 }
501 }
502
503 Ok(())
504 }
505
506 #[instrument(
509 level = "debug",
510 target = "engine::tree::payload_processor::sparse_trie",
511 skip_all
512 )]
513 fn process_leaf_updates(&mut self, new: bool) -> SparseTrieResult<()> {
514 let storage_updates =
515 if new { &mut self.new_storage_updates } else { &mut self.storage_updates };
516
517 let span = debug_span!("process_storage_leaf_updates").entered();
519 for (address, updates) in storage_updates {
520 if updates.is_empty() {
521 continue;
522 }
523 let _enter = trace_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage_trie_leaf_updates", a=%address).entered();
524
525 let trie = self.trie.get_or_create_storage_trie_mut(*address);
526 let fetched = self.fetched_storage_targets.entry(*address).or_default();
527 let mut targets = Vec::new();
528
529 let updates_len_before = updates.len();
530 trie.update_leaves(updates, |path, min_len| match fetched.entry(path) {
531 Entry::Occupied(mut entry) => {
532 if min_len < *entry.get() {
533 entry.insert(min_len);
534 targets.push(ProofV2Target::new(path).with_min_len(min_len));
535 }
536 }
537 Entry::Vacant(entry) => {
538 entry.insert(min_len);
539 targets.push(ProofV2Target::new(path).with_min_len(min_len));
540 }
541 })?;
542 let updates_len_after = updates.len();
543 self.storage_cache_hits += (updates_len_before - updates_len_after) as u64;
544 self.storage_cache_misses += updates_len_after as u64;
545
546 if !targets.is_empty() {
547 self.pending_targets.extend_storage_targets(address, targets);
548 }
549 }
550
551 drop(span);
552
553 self.process_account_leaf_updates(new)?;
555
556 Ok(())
557 }
558
559 #[instrument(
563 level = "debug",
564 target = "engine::tree::payload_processor::sparse_trie",
565 skip_all
566 )]
567 fn process_account_leaf_updates(&mut self, new: bool) -> SparseTrieResult<bool> {
568 let account_updates =
569 if new { &mut self.new_account_updates } else { &mut self.account_updates };
570
571 let updates_len_before = account_updates.len();
572
573 self.trie.trie_mut().update_leaves(account_updates, |target, min_len| {
574 match self.fetched_account_targets.entry(target) {
575 Entry::Occupied(mut entry) => {
576 if min_len < *entry.get() {
577 entry.insert(min_len);
578 self.pending_targets
579 .push_account_target(ProofV2Target::new(target).with_min_len(min_len));
580 }
581 }
582 Entry::Vacant(entry) => {
583 entry.insert(min_len);
584 self.pending_targets
585 .push_account_target(ProofV2Target::new(target).with_min_len(min_len));
586 }
587 }
588 })?;
589
590 let updates_len_after = account_updates.len();
591 self.account_cache_hits += (updates_len_before - updates_len_after) as u64;
592 self.account_cache_misses += updates_len_after as u64;
593
594 Ok(updates_len_after < updates_len_before)
595 }
596
597 #[instrument(
601 level = "debug",
602 target = "engine::tree::payload_processor::sparse_trie",
603 skip_all
604 )]
605 fn promote_pending_account_updates(&mut self) -> SparseTrieResult<()> {
606 self.process_leaf_updates(false)?;
607
608 if self.pending_account_updates.is_empty() {
609 return Ok(());
610 }
611
612 let span = debug_span!("compute_storage_roots").entered();
613 self
614 .trie
615 .storage_tries_mut()
616 .iter_mut()
617 .filter(|(address, trie)| {
618 self.storage_updates.get(*address).is_some_and(|updates| updates.is_empty()) &&
619 !trie.is_root_cached()
620 })
621 .par_bridge_buffered()
622 .for_each(|(address, trie)| {
623 let _enter = debug_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage_root", ?address).entered();
624 trie.root().expect("updates are drained, trie should be revealed by now");
625 });
626 drop(span);
627
628 loop {
629 let span = debug_span!("promote_updates", promoted = tracing::field::Empty).entered();
630 let account_rlp_buf = &mut self.account_rlp_buf;
632 let mut num_promoted = 0;
633 self.pending_account_updates.retain(|addr, account| {
634 if let Some(updates) = self.storage_updates.get(addr) {
635 if !updates.is_empty() {
636 return true;
638 } else if let Some(account) = account.take() {
639 let storage_root = self.trie.storage_root(addr).expect("updates are drained, storage trie should be revealed by now");
640 let encoded = encode_account_leaf_value(account, storage_root, account_rlp_buf);
641 self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
642 num_promoted += 1;
643 return false;
644 }
645 }
646
647 let trie_account = match self.account_updates.get(addr) {
649 Some(LeafUpdate::Changed(encoded)) => {
650 Some(encoded).filter(|encoded| !encoded.is_empty())
651 }
652 Some(LeafUpdate::Touched) => return true,
654 None => self.trie.get_account_value(addr),
655 };
656
657 let trie_account = trie_account.map(|value| TrieAccount::decode(&mut &value[..]).expect("invalid account RLP"));
658
659 let (account, storage_root) = if let Some(account) = account.take() {
660 let storage_root = trie_account.map(|account| account.storage_root).unwrap_or(EMPTY_ROOT_HASH);
665
666 (account, storage_root)
667 } else {
668 (trie_account.map(Into::into), self.trie.storage_root(addr).expect("account had storage updates that were applied to its trie, storage root must be revealed by now"))
669 };
670
671 let encoded = encode_account_leaf_value(account, storage_root, account_rlp_buf);
672 self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
673 num_promoted += 1;
674
675 false
676 });
677 span.record("promoted", num_promoted);
678 drop(span);
679
680 if num_promoted == 0 || !self.process_account_leaf_updates(false)? {
685 break
686 }
687 }
688
689 Ok(())
690 }
691
692 fn dispatch_pending_targets(&mut self) {
693 if self.pending_targets.is_empty() {
694 return;
695 }
696
697 let _span = debug_span!("dispatch_pending_targets").entered();
698 let (targets, chunking_length) = self.pending_targets.take();
699 dispatch_with_chunking(
700 targets,
701 chunking_length,
702 self.chunk_size,
703 self.max_targets_for_chunking,
704 self.proof_worker_handle.available_account_workers(),
705 self.proof_worker_handle.available_storage_workers(),
706 MultiProofTargetsV2::chunks,
707 |proof_targets| {
708 if let Err(e) =
709 self.proof_worker_handle.dispatch_account_multiproof(AccountMultiproofInput {
710 targets: proof_targets,
711 proof_result_sender: ProofResultContext::new(
712 self.proof_result_tx.clone(),
713 HashedPostState::default(),
714 Instant::now(),
715 ),
716 })
717 {
718 error!("failed to dispatch account multiproof: {e:?}");
719 }
720 },
721 );
722 }
723}
724
725fn encode_account_leaf_value(
727 account: Option<Account>,
728 storage_root: B256,
729 account_rlp_buf: &mut Vec<u8>,
730) -> Vec<u8> {
731 if account.is_none_or(|account| account.is_empty()) && storage_root == EMPTY_ROOT_HASH {
732 return Vec::new();
733 }
734
735 account_rlp_buf.clear();
736 account.unwrap_or_default().into_trie_account(storage_root).encode(account_rlp_buf);
737 account_rlp_buf.clone()
738}
739
740#[derive(Default)]
742struct PendingTargets {
743 targets: MultiProofTargetsV2,
745 len: usize,
747}
748
749impl PendingTargets {
750 const fn len(&self) -> usize {
752 self.len
753 }
754
755 const fn is_empty(&self) -> bool {
757 self.len == 0
758 }
759
760 fn take(&mut self) -> (MultiProofTargetsV2, usize) {
762 (std::mem::take(&mut self.targets), std::mem::take(&mut self.len))
763 }
764
765 fn push_account_target(&mut self, target: ProofV2Target) {
767 self.targets.account_targets.push(target);
768 self.len += 1;
769 }
770
771 fn extend_storage_targets(&mut self, address: &B256, targets: Vec<ProofV2Target>) {
773 self.len += targets.len();
774 self.targets.storage_targets.entry(*address).or_default().extend(targets);
775 }
776}
777
778enum SparseTrieTaskMessage {
780 HashedState(HashedPostState),
782 PrefetchProofs(MultiProofTargetsV2),
784 FinishedStateUpdates,
786}
787
788#[derive(Debug, Clone)]
791pub struct StateRootComputeOutcome {
792 pub state_root: B256,
794 pub trie_updates: Arc<TrieUpdates>,
796 #[cfg(feature = "trie-debug")]
799 pub debug_recorders: Vec<(Option<B256>, TrieDebugRecorder)>,
800}
801
802#[cfg(test)]
803mod tests {
804 use super::*;
805 use alloy_primitives::{keccak256, Address, B256, U256};
806 use reth_trie_sparse::ParallelSparseTrie;
807
808 #[test]
809 fn test_run_hashing_task_hashed_state_update_forwards() {
810 let (updates_tx, updates_rx) = crossbeam_channel::unbounded();
811 let (hashed_state_tx, hashed_state_rx) = crossbeam_channel::unbounded();
812
813 let address = keccak256(Address::random());
814 let slot = keccak256(U256::from(42).to_be_bytes::<32>());
815 let value = U256::from(999);
816
817 let mut hashed_state = HashedPostState::default();
818 hashed_state.accounts.insert(
819 address,
820 Some(Account { balance: U256::from(100), nonce: 1, bytecode_hash: None }),
821 );
822 let mut storage = reth_trie::HashedStorage::new(false);
823 storage.storage.insert(slot, value);
824 hashed_state.storages.insert(address, storage);
825
826 let expected_state = hashed_state.clone();
827
828 let handle = std::thread::spawn(move || {
829 SparseTrieCacheTask::<ParallelSparseTrie, ParallelSparseTrie>::run_hashing_task(
830 updates_rx,
831 hashed_state_tx,
832 );
833 });
834
835 updates_tx.send(MultiProofMessage::HashedStateUpdate(hashed_state)).unwrap();
836 updates_tx.send(MultiProofMessage::FinishedStateUpdates).unwrap();
837 drop(updates_tx);
838
839 let SparseTrieTaskMessage::HashedState(received) = hashed_state_rx.recv().unwrap() else {
840 panic!("expected HashedState message");
841 };
842
843 let account = received.accounts.get(&address).unwrap().unwrap();
844 assert_eq!(account.balance, expected_state.accounts[&address].unwrap().balance);
845 assert_eq!(account.nonce, expected_state.accounts[&address].unwrap().nonce);
846
847 let storage = received.storages.get(&address).unwrap();
848 assert_eq!(*storage.storage.get(&slot).unwrap(), value);
849
850 let second = hashed_state_rx.recv().unwrap();
851 assert!(matches!(second, SparseTrieTaskMessage::FinishedStateUpdates));
852
853 assert!(hashed_state_rx.recv().is_err());
854 handle.join().unwrap();
855 }
856
857 #[test]
858 fn test_encode_account_leaf_value_empty_account_and_empty_root_is_empty() {
859 let mut account_rlp_buf = vec![0xAB];
860 let encoded = encode_account_leaf_value(None, EMPTY_ROOT_HASH, &mut account_rlp_buf);
861
862 assert!(encoded.is_empty());
863 assert_eq!(account_rlp_buf, vec![0xAB]);
865 }
866
867 #[test]
868 fn test_encode_account_leaf_value_non_empty_account_is_rlp() {
869 let storage_root = B256::from([0x99; 32]);
870 let account = Some(Account {
871 nonce: 7,
872 balance: U256::from(42),
873 bytecode_hash: Some(B256::from([0xAA; 32])),
874 });
875 let mut account_rlp_buf = vec![0x00, 0x01];
876
877 let encoded = encode_account_leaf_value(account, storage_root, &mut account_rlp_buf);
878 let decoded = TrieAccount::decode(&mut &encoded[..]).expect("valid account RLP");
879
880 assert_eq!(decoded.nonce, 7);
881 assert_eq!(decoded.balance, U256::from(42));
882 assert_eq!(decoded.storage_root, storage_root);
883 assert_eq!(account_rlp_buf, encoded);
884 }
885}