1use crate::tree::{
4 multiproof::{
5 dispatch_with_chunking, evm_state_to_hashed_post_state, MultiProofMessage,
6 VersionedMultiProofTargets, DEFAULT_MAX_TARGETS_FOR_CHUNKING,
7 },
8 payload_processor::multiproof::{MultiProofTaskMetrics, SparseTrieUpdate},
9};
10use alloy_primitives::B256;
11use alloy_rlp::{Decodable, Encodable};
12use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
13use rayon::iter::ParallelIterator;
14use reth_primitives_traits::{Account, ParallelBridgeBuffered};
15use reth_tasks::Runtime;
16use reth_trie::{
17 proof_v2::Target, updates::TrieUpdates, DecodedMultiProofV2, HashedPostState, Nibbles,
18 TrieAccount, EMPTY_ROOT_HASH, TRIE_ACCOUNT_RLP_MAX_SIZE,
19};
20use reth_trie_parallel::{
21 proof_task::{
22 AccountMultiproofInput, ProofResult, ProofResultContext, ProofResultMessage,
23 ProofWorkerHandle,
24 },
25 root::ParallelStateRootError,
26 targets_v2::MultiProofTargetsV2,
27};
28use reth_trie_sparse::{
29 errors::{SparseStateTrieResult, SparseTrieErrorKind, SparseTrieResult},
30 provider::{TrieNodeProvider, TrieNodeProviderFactory},
31 DeferredDrops, LeafUpdate, ParallelSparseTrie, SparseStateTrie, SparseTrie,
32};
33use revm_primitives::{hash_map::Entry, B256Map};
34use smallvec::SmallVec;
35use std::{
36 sync::mpsc,
37 time::{Duration, Instant},
38};
39use tracing::{debug, debug_span, error, instrument, trace};
40
41#[expect(clippy::large_enum_variant)]
42pub(super) enum SpawnedSparseTrieTask<BPF, A, S>
43where
44 BPF: TrieNodeProviderFactory + Send + Sync,
45 BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
46 BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
47 A: SparseTrie + Send + Sync + Default,
48 S: SparseTrie + Send + Sync + Default + Clone,
49{
50 Cleared(SparseTrieTask<BPF, A, S>),
51 Cached(SparseTrieCacheTask<A, S>),
52}
53
54impl<BPF, A, S> SpawnedSparseTrieTask<BPF, A, S>
55where
56 BPF: TrieNodeProviderFactory + Send + Sync + Clone,
57 BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
58 BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
59 A: SparseTrie + Send + Sync + Default,
60 S: SparseTrie + Send + Sync + Default + Clone,
61{
62 pub(super) fn run(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
63 match self {
64 Self::Cleared(task) => task.run(),
65 Self::Cached(task) => task.run(),
66 }
67 }
68
69 pub(super) fn into_trie_for_reuse(
70 self,
71 prune_depth: usize,
72 max_storage_tries: usize,
73 max_nodes_capacity: usize,
74 max_values_capacity: usize,
75 disable_pruning: bool,
76 ) -> (SparseStateTrie<A, S>, DeferredDrops) {
77 match self {
78 Self::Cleared(task) => task.into_cleared_trie(max_nodes_capacity, max_values_capacity),
79 Self::Cached(task) => task.into_trie_for_reuse(
80 prune_depth,
81 max_storage_tries,
82 max_nodes_capacity,
83 max_values_capacity,
84 disable_pruning,
85 ),
86 }
87 }
88
89 pub(super) fn into_cleared_trie(
90 self,
91 max_nodes_capacity: usize,
92 max_values_capacity: usize,
93 ) -> (SparseStateTrie<A, S>, DeferredDrops) {
94 match self {
95 Self::Cleared(task) => task.into_cleared_trie(max_nodes_capacity, max_values_capacity),
96 Self::Cached(task) => task.into_cleared_trie(max_nodes_capacity, max_values_capacity),
97 }
98 }
99}
100
101pub(super) struct SparseTrieTask<BPF, A = ParallelSparseTrie, S = ParallelSparseTrie>
103where
104 BPF: TrieNodeProviderFactory + Send + Sync,
105 BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
106 BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
107{
108 pub(super) updates: mpsc::Receiver<SparseTrieUpdate>,
110 pub(super) trie: SparseStateTrie<A, S>,
112 pub(super) metrics: MultiProofTaskMetrics,
113 blinded_provider_factory: BPF,
115}
116
117impl<BPF, A, S> SparseTrieTask<BPF, A, S>
118where
119 BPF: TrieNodeProviderFactory + Send + Sync + Clone,
120 BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
121 BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
122 A: SparseTrie + Send + Sync + Default,
123 S: SparseTrie + Send + Sync + Default + Clone,
124{
125 pub(super) const fn new(
127 updates: mpsc::Receiver<SparseTrieUpdate>,
128 blinded_provider_factory: BPF,
129 metrics: MultiProofTaskMetrics,
130 trie: SparseStateTrie<A, S>,
131 ) -> Self {
132 Self { updates, metrics, trie, blinded_provider_factory }
133 }
134
135 #[instrument(
140 name = "SparseTrieTask::run",
141 level = "debug",
142 target = "engine::tree::payload_processor::sparse_trie",
143 skip_all
144 )]
145 pub(super) fn run(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
146 let now = Instant::now();
147
148 let mut num_iterations = 0;
149
150 while let Ok(mut update) = self.updates.recv() {
151 num_iterations += 1;
152 let mut num_updates = 1;
153 let _enter =
154 debug_span!(target: "engine::tree::payload_processor::sparse_trie", "drain updates")
155 .entered();
156 while let Ok(next) = self.updates.try_recv() {
157 update.extend(next);
158 num_updates += 1;
159 }
160 drop(_enter);
161
162 debug!(
163 target: "engine::root",
164 num_updates,
165 account_proofs = update.multiproof.account_proofs_len(),
166 storage_proofs = update.multiproof.storage_proofs_len(),
167 "Updating sparse trie"
168 );
169
170 let elapsed =
171 update_sparse_trie(&mut self.trie, update, &self.blinded_provider_factory)
172 .map_err(|e| {
173 ParallelStateRootError::Other(format!(
174 "could not calculate state root: {e:?}"
175 ))
176 })?;
177 self.metrics.sparse_trie_update_duration_histogram.record(elapsed);
178 trace!(target: "engine::root", ?elapsed, num_iterations, "Root calculation completed");
179 }
180
181 debug!(target: "engine::root", num_iterations, "All proofs processed, ending calculation");
182
183 let start = Instant::now();
184 let (state_root, trie_updates) =
185 self.trie.root_with_updates(&self.blinded_provider_factory).map_err(|e| {
186 ParallelStateRootError::Other(format!("could not calculate state root: {e:?}"))
187 })?;
188
189 let end = Instant::now();
190 self.metrics.sparse_trie_final_update_duration_histogram.record(end.duration_since(start));
191 self.metrics.sparse_trie_total_duration_histogram.record(end.duration_since(now));
192
193 Ok(StateRootComputeOutcome { state_root, trie_updates })
194 }
195
196 pub(super) fn into_cleared_trie(
201 self,
202 max_nodes_capacity: usize,
203 max_values_capacity: usize,
204 ) -> (SparseStateTrie<A, S>, DeferredDrops) {
205 let Self { mut trie, .. } = self;
206 trie.clear();
207 trie.shrink_to(max_nodes_capacity, max_values_capacity);
208 let deferred = trie.take_deferred_drops();
209 (trie, deferred)
210 }
211}
212
213const MAX_PENDING_UPDATES: usize = 100;
215
216pub(super) struct SparseTrieCacheTask<A = ParallelSparseTrie, S = ParallelSparseTrie> {
218 proof_result_tx: CrossbeamSender<ProofResultMessage>,
220 proof_result_rx: CrossbeamReceiver<ProofResultMessage>,
222 updates: CrossbeamReceiver<SparseTrieTaskMessage>,
224 trie: SparseStateTrie<A, S>,
226 proof_worker_handle: ProofWorkerHandle,
228
229 chunk_size: Option<usize>,
232 max_targets_for_chunking: usize,
236
237 account_updates: B256Map<LeafUpdate>,
239 storage_updates: B256Map<B256Map<LeafUpdate>>,
241
242 new_account_updates: B256Map<LeafUpdate>,
244 new_storage_updates: B256Map<B256Map<LeafUpdate>>,
246 pending_account_updates: B256Map<Option<Option<Account>>>,
260 fetched_account_targets: B256Map<u8>,
263 fetched_storage_targets: B256Map<B256Map<u8>>,
266 account_rlp_buf: Vec<u8>,
268 finished_state_updates: bool,
270 pending_targets: MultiProofTargetsV2,
272 pending_updates: usize,
275
276 metrics: MultiProofTaskMetrics,
278}
279
280impl<A, S> SparseTrieCacheTask<A, S>
281where
282 A: SparseTrie + Default,
283 S: SparseTrie + Default + Clone,
284{
285 pub(super) fn new_with_trie(
287 executor: &Runtime,
288 updates: CrossbeamReceiver<MultiProofMessage>,
289 proof_worker_handle: ProofWorkerHandle,
290 metrics: MultiProofTaskMetrics,
291 trie: SparseStateTrie<A, S>,
292 chunk_size: Option<usize>,
293 ) -> Self {
294 let (proof_result_tx, proof_result_rx) = crossbeam_channel::unbounded();
295 let (hashed_state_tx, hashed_state_rx) = crossbeam_channel::unbounded();
296
297 let parent_span = tracing::Span::current();
298 executor.spawn_blocking(move || {
299 let _span = debug_span!(parent: parent_span, "run_hashing_task").entered();
300 Self::run_hashing_task(updates, hashed_state_tx)
301 });
302
303 Self {
304 proof_result_tx,
305 proof_result_rx,
306 updates: hashed_state_rx,
307 proof_worker_handle,
308 trie,
309 chunk_size,
310 max_targets_for_chunking: DEFAULT_MAX_TARGETS_FOR_CHUNKING,
311 account_updates: Default::default(),
312 storage_updates: Default::default(),
313 new_account_updates: Default::default(),
314 new_storage_updates: Default::default(),
315 pending_account_updates: Default::default(),
316 fetched_account_targets: Default::default(),
317 fetched_storage_targets: Default::default(),
318 account_rlp_buf: Vec::with_capacity(TRIE_ACCOUNT_RLP_MAX_SIZE),
319 finished_state_updates: Default::default(),
320 pending_targets: Default::default(),
321 pending_updates: Default::default(),
322 metrics,
323 }
324 }
325
326 fn run_hashing_task(
329 updates: CrossbeamReceiver<MultiProofMessage>,
330 hashed_state_tx: CrossbeamSender<SparseTrieTaskMessage>,
331 ) {
332 while let Ok(message) = updates.recv() {
333 let msg = match message {
334 MultiProofMessage::PrefetchProofs(targets) => {
335 SparseTrieTaskMessage::PrefetchProofs(targets)
336 }
337 MultiProofMessage::StateUpdate(_, state) => {
338 let _span = debug_span!(target: "engine::tree::payload_processor::sparse_trie", "hashing state update", update_len = state.len()).entered();
339 let hashed = evm_state_to_hashed_post_state(state);
340 SparseTrieTaskMessage::HashedState(hashed)
341 }
342 MultiProofMessage::FinishedStateUpdates => {
343 SparseTrieTaskMessage::FinishedStateUpdates
344 }
345 MultiProofMessage::EmptyProof { .. } | MultiProofMessage::BlockAccessList(_) => {
346 continue
347 }
348 MultiProofMessage::HashedStateUpdate(state) => {
349 SparseTrieTaskMessage::HashedState(state)
350 }
351 };
352 if hashed_state_tx.send(msg).is_err() {
353 break;
354 }
355 }
356 }
357
358 pub(super) fn into_trie_for_reuse(
366 self,
367 prune_depth: usize,
368 max_storage_tries: usize,
369 max_nodes_capacity: usize,
370 max_values_capacity: usize,
371 disable_pruning: bool,
372 ) -> (SparseStateTrie<A, S>, DeferredDrops) {
373 let Self { mut trie, .. } = self;
374 if !disable_pruning {
375 trie.prune(prune_depth, max_storage_tries);
376 trie.shrink_to(max_nodes_capacity, max_values_capacity);
377 }
378 let deferred = trie.take_deferred_drops();
379 (trie, deferred)
380 }
381
382 pub(super) fn into_cleared_trie(
387 self,
388 max_nodes_capacity: usize,
389 max_values_capacity: usize,
390 ) -> (SparseStateTrie<A, S>, DeferredDrops) {
391 let Self { mut trie, .. } = self;
392 trie.clear();
393 trie.shrink_to(max_nodes_capacity, max_values_capacity);
394 let deferred = trie.take_deferred_drops();
395 (trie, deferred)
396 }
397
398 #[instrument(
405 name = "SparseTrieCacheTask::run",
406 level = "debug",
407 target = "engine::tree::payload_processor::sparse_trie",
408 skip_all
409 )]
410 pub(super) fn run(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
411 let now = Instant::now();
412
413 loop {
414 crossbeam_channel::select_biased! {
415 recv(self.updates) -> message => {
416 let update = match message {
417 Ok(m) => m,
418 Err(_) => {
419 return Err(ParallelStateRootError::Other(
420 "updates channel disconnected before state root calculation".to_string(),
421 ))
422 }
423 };
424
425 self.on_message(update);
426 self.pending_updates += 1;
427 }
428 recv(self.proof_result_rx) -> message => {
429 let Ok(result) = message else {
430 unreachable!("we own the sender half")
431 };
432 let ProofResult::V2(mut result) = result.result? else {
433 unreachable!("sparse trie as cache must only be used with multiproof v2");
434 };
435
436 while let Ok(next) = self.proof_result_rx.try_recv() {
437 let ProofResult::V2(res) = next.result? else {
438 unreachable!("sparse trie as cache must only be used with multiproof v2");
439 };
440 result.extend(res);
441 }
442
443 self.on_proof_result(result)?;
444 },
445 }
446
447 if self.updates.is_empty() && self.proof_result_rx.is_empty() {
448 self.dispatch_pending_targets();
451 self.process_new_updates()?;
452 self.promote_pending_account_updates()?;
453
454 if self.finished_state_updates &&
455 self.account_updates.is_empty() &&
456 self.storage_updates.iter().all(|(_, updates)| updates.is_empty())
457 {
458 break;
459 }
460
461 self.dispatch_pending_targets();
462 } else if self.updates.is_empty() || self.pending_updates > MAX_PENDING_UPDATES {
463 self.process_new_updates()?;
466 self.dispatch_pending_targets();
467 } else if self.pending_targets.chunking_length() > self.chunk_size.unwrap_or_default() {
468 self.dispatch_pending_targets();
470 }
471 }
472
473 debug!(target: "engine::root", "All proofs processed, ending calculation");
474
475 let start = Instant::now();
476 let (state_root, trie_updates) =
477 self.trie.root_with_updates(&self.proof_worker_handle).map_err(|e| {
478 ParallelStateRootError::Other(format!("could not calculate state root: {e:?}"))
479 })?;
480
481 let end = Instant::now();
482 self.metrics.sparse_trie_final_update_duration_histogram.record(end.duration_since(start));
483 self.metrics.sparse_trie_total_duration_histogram.record(end.duration_since(now));
484
485 Ok(StateRootComputeOutcome { state_root, trie_updates })
486 }
487
488 fn on_message(&mut self, message: SparseTrieTaskMessage) {
490 match message {
491 SparseTrieTaskMessage::PrefetchProofs(targets) => self.on_prewarm_targets(targets),
492 SparseTrieTaskMessage::HashedState(hashed_state) => {
493 self.on_hashed_state_update(hashed_state)
494 }
495 SparseTrieTaskMessage::FinishedStateUpdates => self.finished_state_updates = true,
496 }
497 }
498
499 #[instrument(
500 level = "trace",
501 target = "engine::tree::payload_processor::sparse_trie",
502 skip_all
503 )]
504 fn on_prewarm_targets(&mut self, targets: VersionedMultiProofTargets) {
505 let VersionedMultiProofTargets::V2(targets) = targets else {
506 unreachable!("sparse trie as cache must only be used with V2 multiproof targets");
507 };
508
509 for target in targets.account_targets {
510 self.new_account_updates.entry(target.key()).or_insert(LeafUpdate::Touched);
512 }
513
514 for (address, slots) in targets.storage_targets {
515 for slot in slots {
516 self.new_storage_updates
518 .entry(address)
519 .or_default()
520 .entry(slot.key())
521 .or_insert(LeafUpdate::Touched);
522 }
523
524 self.new_account_updates.entry(address).or_insert(LeafUpdate::Touched);
527 }
528 }
529
530 #[instrument(
532 level = "trace",
533 target = "engine::tree::payload_processor::sparse_trie",
534 skip_all
535 )]
536 fn on_hashed_state_update(&mut self, hashed_state_update: HashedPostState) {
537 for (address, storage) in hashed_state_update.storages {
538 for (slot, value) in storage.storage {
539 let encoded = if value.is_zero() {
540 Vec::new()
541 } else {
542 alloy_rlp::encode_fixed_size(&value).to_vec()
543 };
544 self.new_storage_updates
545 .entry(address)
546 .or_default()
547 .insert(slot, LeafUpdate::Changed(encoded));
548
549 self.storage_updates.get_mut(&address).and_then(|updates| updates.remove(&slot));
551 }
552
553 self.new_account_updates.entry(address).or_insert(LeafUpdate::Touched);
556
557 self.pending_account_updates.entry(address).or_insert(None);
560 }
561
562 for (address, account) in hashed_state_update.accounts {
563 self.new_account_updates.insert(address, LeafUpdate::Touched);
568
569 self.pending_account_updates.insert(address, Some(account));
572 }
573 }
574
575 fn on_proof_result(
576 &mut self,
577 result: DecodedMultiProofV2,
578 ) -> Result<(), ParallelStateRootError> {
579 self.trie.reveal_decoded_multiproof_v2(result).map_err(|e| {
580 ParallelStateRootError::Other(format!("could not reveal multiproof: {e:?}"))
581 })
582 }
583
584 #[instrument(
585 level = "debug",
586 target = "engine::tree::payload_processor::sparse_trie",
587 skip_all
588 )]
589 fn process_new_updates(&mut self) -> SparseTrieResult<()> {
590 self.pending_updates = 0;
591
592 self.process_leaf_updates(true)?;
594
595 for (address, mut new) in self.new_storage_updates.drain() {
596 let updates = self.storage_updates.entry(address).or_default();
597 for (slot, new) in new.drain() {
598 match updates.entry(slot) {
599 Entry::Occupied(mut entry) => {
600 if new.is_changed() {
602 entry.insert(new);
603 }
604 }
605 Entry::Vacant(entry) => {
606 entry.insert(new);
607 }
608 }
609 }
610 }
611
612 for (address, new) in self.new_account_updates.drain() {
613 match self.account_updates.entry(address) {
614 Entry::Occupied(mut entry) => {
615 if new.is_changed() {
616 entry.insert(new);
617 }
618 }
619 Entry::Vacant(entry) => {
620 entry.insert(new);
621 }
622 }
623 }
624
625 Ok(())
626 }
627
628 #[instrument(
631 level = "debug",
632 target = "engine::tree::payload_processor::sparse_trie",
633 skip_all
634 )]
635 fn process_leaf_updates(&mut self, new: bool) -> SparseTrieResult<()> {
636 let storage_updates =
637 if new { &mut self.new_storage_updates } else { &mut self.storage_updates };
638
639 let span = tracing::Span::current();
641 let storage_results = storage_updates
642 .iter_mut()
643 .filter(|(_, updates)| !updates.is_empty())
644 .map(|(address, updates)| {
645 let trie = self.trie.take_or_create_storage_trie(address);
646 let fetched = self.fetched_storage_targets.remove(address).unwrap_or_default();
647
648 (address, updates, fetched, trie)
649 })
650 .par_bridge_buffered()
651 .map(|(address, updates, mut fetched, mut trie)| {
652 let _enter = debug_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage trie leaf updates", ?address).entered();
653 let mut targets = Vec::new();
654
655 trie.update_leaves(updates, |path, min_len| match fetched.entry(path) {
656 Entry::Occupied(mut entry) => {
657 if min_len < *entry.get() {
658 entry.insert(min_len);
659 targets.push(Target::new(path).with_min_len(min_len));
660 }
661 }
662 Entry::Vacant(entry) => {
663 entry.insert(min_len);
664 targets.push(Target::new(path).with_min_len(min_len));
665 }
666 })?;
667
668 SparseTrieResult::Ok((address, targets, fetched, trie))
669 })
670 .collect::<Result<Vec<_>, _>>()?;
671
672 drop(span);
673
674 for (address, targets, fetched, trie) in storage_results {
675 self.fetched_storage_targets.insert(*address, fetched);
676 self.trie.insert_storage_trie(*address, trie);
677
678 if !targets.is_empty() {
679 self.pending_targets.storage_targets.entry(*address).or_default().extend(targets);
680 }
681 }
682
683 self.process_account_leaf_updates(new)?;
685
686 Ok(())
687 }
688
689 #[instrument(
693 level = "debug",
694 target = "engine::tree::payload_processor::sparse_trie",
695 skip_all
696 )]
697 fn process_account_leaf_updates(&mut self, new: bool) -> SparseTrieResult<bool> {
698 let account_updates =
699 if new { &mut self.new_account_updates } else { &mut self.account_updates };
700
701 let updates_len_before = account_updates.len();
702
703 self.trie.trie_mut().update_leaves(account_updates, |target, min_len| {
704 match self.fetched_account_targets.entry(target) {
705 Entry::Occupied(mut entry) => {
706 if min_len < *entry.get() {
707 entry.insert(min_len);
708 self.pending_targets
709 .account_targets
710 .push(Target::new(target).with_min_len(min_len));
711 }
712 }
713 Entry::Vacant(entry) => {
714 entry.insert(min_len);
715 self.pending_targets
716 .account_targets
717 .push(Target::new(target).with_min_len(min_len));
718 }
719 }
720 })?;
721
722 Ok(account_updates.len() < updates_len_before)
723 }
724
725 #[instrument(
729 level = "debug",
730 target = "engine::tree::payload_processor::sparse_trie",
731 skip_all
732 )]
733 fn promote_pending_account_updates(&mut self) -> SparseTrieResult<()> {
734 self.process_leaf_updates(false)?;
735
736 if self.pending_account_updates.is_empty() {
737 return Ok(());
738 }
739
740 let span = debug_span!("compute_storage_roots").entered();
741 self
742 .trie
743 .storage_tries_mut()
744 .iter_mut()
745 .filter(|(address, trie)| {
746 self.storage_updates.get(*address).is_some_and(|updates| updates.is_empty()) &&
747 !trie.is_root_cached()
748 })
749 .par_bridge_buffered()
750 .for_each(|(address, trie)| {
751 let _enter = debug_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage root", ?address).entered();
752 trie.root().expect("updates are drained, trie should be revealed by now");
753 });
754 drop(span);
755
756 loop {
757 let span = debug_span!("promote_updates", promoted = tracing::field::Empty).entered();
758 let account_rlp_buf = &mut self.account_rlp_buf;
760 let mut num_promoted = 0;
761 self.pending_account_updates.retain(|addr, account| {
762 if let Some(updates) = self.storage_updates.get(addr) {
763 if !updates.is_empty() {
764 return true;
766 } else if let Some(account) = account.take() {
767 let storage_root = self.trie.storage_root(addr).expect("updates are drained, storage trie should be revealed by now");
768 let encoded = if account.is_none_or(|account| account.is_empty()) &&
769 storage_root == EMPTY_ROOT_HASH
770 {
771 Vec::new()
772 } else {
773 account_rlp_buf.clear();
774 account
775 .unwrap_or_default()
776 .into_trie_account(storage_root)
777 .encode(account_rlp_buf);
778 account_rlp_buf.clone()
779 };
780 self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
781 num_promoted += 1;
782 return false;
783 }
784 }
785
786 let trie_account = if let Some(LeafUpdate::Changed(encoded)) = self.account_updates.get(addr) {
788 Some(encoded).filter(|encoded| !encoded.is_empty())
789 } else if !self.account_updates.contains_key(addr) {
790 self.trie.get_account_value(addr)
791 } else {
792 return true;
794 };
795
796 let trie_account = trie_account.map(|value| TrieAccount::decode(&mut &value[..]).expect("invalid account RLP"));
797
798 let (account, storage_root) = if let Some(account) = account.take() {
799 let storage_root = trie_account.map(|account| account.storage_root).unwrap_or(EMPTY_ROOT_HASH);
804
805 (account, storage_root)
806 } else {
807 (trie_account.map(Into::into), self.trie.storage_root(addr).expect("account had storage updates that were applied to its trie, storage root must be revealed by now"))
808 };
809
810 let encoded = if account.is_none_or(|account| account.is_empty()) && storage_root == EMPTY_ROOT_HASH {
811 Vec::new()
812 } else {
813 account_rlp_buf.clear();
814 account.unwrap_or_default().into_trie_account(storage_root).encode(account_rlp_buf);
815 account_rlp_buf.clone()
816 };
817 self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
818 num_promoted += 1;
819
820 false
821 });
822 span.record("promoted", num_promoted);
823 drop(span);
824
825 if num_promoted == 0 || !self.process_account_leaf_updates(false)? {
830 break
831 }
832 }
833
834 Ok(())
835 }
836
837 #[instrument(
838 level = "debug",
839 target = "engine::tree::payload_processor::sparse_trie",
840 skip_all
841 )]
842 fn dispatch_pending_targets(&mut self) {
843 if !self.pending_targets.is_empty() {
844 let chunking_length = self.pending_targets.chunking_length();
845 dispatch_with_chunking(
846 std::mem::take(&mut self.pending_targets),
847 chunking_length,
848 self.chunk_size,
849 self.max_targets_for_chunking,
850 self.proof_worker_handle.available_account_workers(),
851 self.proof_worker_handle.available_storage_workers(),
852 MultiProofTargetsV2::chunks,
853 |proof_targets| {
854 if let Err(e) = self.proof_worker_handle.dispatch_account_multiproof(
855 AccountMultiproofInput::V2 {
856 targets: proof_targets,
857 proof_result_sender: ProofResultContext::new(
858 self.proof_result_tx.clone(),
859 0,
860 HashedPostState::default(),
861 Instant::now(),
862 ),
863 },
864 ) {
865 error!("failed to dispatch account multiproof: {e:?}");
866 }
867 },
868 );
869 }
870 }
871}
872
873enum SparseTrieTaskMessage {
875 HashedState(HashedPostState),
877 PrefetchProofs(VersionedMultiProofTargets),
879 FinishedStateUpdates,
881}
882
883#[derive(Debug)]
886pub struct StateRootComputeOutcome {
887 pub state_root: B256,
889 pub trie_updates: TrieUpdates,
891}
892
893#[instrument(level = "debug", target = "engine::tree::payload_processor::sparse_trie", skip_all)]
895pub(crate) fn update_sparse_trie<BPF, A, S>(
896 trie: &mut SparseStateTrie<A, S>,
897 SparseTrieUpdate { mut state, multiproof }: SparseTrieUpdate,
898 blinded_provider_factory: &BPF,
899) -> SparseStateTrieResult<Duration>
900where
901 BPF: TrieNodeProviderFactory + Send + Sync,
902 BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
903 BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
904 A: SparseTrie + Send + Sync + Default,
905 S: SparseTrie + Send + Sync + Default + Clone,
906{
907 trace!(target: "engine::root::sparse", "Updating sparse trie");
908 let started_at = Instant::now();
909
910 match multiproof {
912 ProofResult::Legacy(decoded, _) => {
913 trie.reveal_decoded_multiproof(decoded)?;
914 }
915 ProofResult::V2(decoded_v2) => {
916 trie.reveal_decoded_multiproof_v2(decoded_v2)?;
917 }
918 }
919 let reveal_multiproof_elapsed = started_at.elapsed();
920 trace!(
921 target: "engine::root::sparse",
922 ?reveal_multiproof_elapsed,
923 "Done revealing multiproof"
924 );
925
926 let span = tracing::Span::current();
928 let results: Vec<_> = state
929 .storages
930 .into_iter()
931 .map(|(address, storage)| (address, storage, trie.take_storage_trie(&address)))
932 .par_bridge_buffered()
933 .map(|(address, storage, storage_trie)| {
934 let _enter =
935 debug_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage trie", ?address)
936 .entered();
937
938 trace!(target: "engine::tree::payload_processor::sparse_trie", "Updating storage");
939 let storage_provider = blinded_provider_factory.storage_node_provider(address);
940 let mut storage_trie = storage_trie.ok_or(SparseTrieErrorKind::Blind)?;
941
942 if storage.wiped {
943 trace!(target: "engine::tree::payload_processor::sparse_trie", "Wiping storage");
944 storage_trie.wipe()?;
945 }
946
947 let mut removed_slots = SmallVec::<[Nibbles; 8]>::new();
952
953 for (slot, value) in storage.storage {
954 let slot_nibbles = Nibbles::unpack(slot);
955
956 if value.is_zero() {
957 removed_slots.push(slot_nibbles);
958 continue;
959 }
960
961 trace!(target: "engine::tree::payload_processor::sparse_trie", ?slot_nibbles, "Updating storage slot");
962 storage_trie.update_leaf(
963 slot_nibbles,
964 alloy_rlp::encode_fixed_size(&value).to_vec(),
965 &storage_provider,
966 )?;
967 }
968
969 for slot_nibbles in removed_slots {
970 trace!(target: "engine::root::sparse", ?slot_nibbles, "Removing storage slot");
971 storage_trie.remove_leaf(&slot_nibbles, &storage_provider)?;
972 }
973
974 storage_trie.root();
975
976 SparseStateTrieResult::Ok((address, storage_trie))
977 })
978 .collect();
979
980 let mut removed_accounts = Vec::new();
985
986 let _enter =
988 tracing::debug_span!(target: "engine::tree::payload_processor::sparse_trie", "account trie")
989 .entered();
990 for result in results {
991 let (address, storage_trie) = result?;
992 trie.insert_storage_trie(address, storage_trie);
993
994 if let Some(account) = state.accounts.remove(&address) {
995 trace!(target: "engine::root::sparse", ?address, "Updating account and its storage root");
998 if !trie.update_account(
999 address,
1000 account.unwrap_or_default(),
1001 blinded_provider_factory,
1002 )? {
1003 removed_accounts.push(address);
1004 }
1005 } else if trie.is_account_revealed(address) {
1006 trace!(target: "engine::root::sparse", ?address, "Updating account storage root");
1008 if !trie.update_account_storage_root(address, blinded_provider_factory)? {
1009 removed_accounts.push(address);
1010 }
1011 }
1012 }
1013
1014 for (address, account) in state.accounts {
1016 trace!(target: "engine::root::sparse", ?address, "Updating account");
1017 if !trie.update_account(address, account.unwrap_or_default(), blinded_provider_factory)? {
1018 removed_accounts.push(address);
1019 }
1020 }
1021
1022 for address in removed_accounts {
1024 trace!(target: "engine::root::sparse", ?address, "Removing account");
1025 let nibbles = Nibbles::unpack(address);
1026 trie.remove_account_leaf(&nibbles, blinded_provider_factory)?;
1027 }
1028
1029 let elapsed_before = started_at.elapsed();
1030 trace!(
1031 target: "engine::root::sparse",
1032 "Calculating subtries"
1033 );
1034 trie.calculate_subtries();
1035
1036 let elapsed = started_at.elapsed();
1037 let below_level_elapsed = elapsed - elapsed_before;
1038 trace!(
1039 target: "engine::root::sparse",
1040 ?below_level_elapsed,
1041 "Intermediate nodes calculated"
1042 );
1043
1044 Ok(elapsed)
1045}
1046
1047#[cfg(test)]
1048mod tests {
1049 use super::*;
1050 use alloy_primitives::{keccak256, Address, U256};
1051 use reth_trie_sparse::ParallelSparseTrie;
1052
1053 #[test]
1054 fn test_run_hashing_task_hashed_state_update_forwards() {
1055 let (updates_tx, updates_rx) = crossbeam_channel::unbounded();
1056 let (hashed_state_tx, hashed_state_rx) = crossbeam_channel::unbounded();
1057
1058 let address = keccak256(Address::random());
1059 let slot = keccak256(U256::from(42).to_be_bytes::<32>());
1060 let value = U256::from(999);
1061
1062 let mut hashed_state = HashedPostState::default();
1063 hashed_state.accounts.insert(
1064 address,
1065 Some(Account { balance: U256::from(100), nonce: 1, bytecode_hash: None }),
1066 );
1067 let mut storage = reth_trie::HashedStorage::new(false);
1068 storage.storage.insert(slot, value);
1069 hashed_state.storages.insert(address, storage);
1070
1071 let expected_state = hashed_state.clone();
1072
1073 let handle = std::thread::spawn(move || {
1074 SparseTrieCacheTask::<ParallelSparseTrie, ParallelSparseTrie>::run_hashing_task(
1075 updates_rx,
1076 hashed_state_tx,
1077 );
1078 });
1079
1080 updates_tx.send(MultiProofMessage::HashedStateUpdate(hashed_state)).unwrap();
1081 updates_tx.send(MultiProofMessage::FinishedStateUpdates).unwrap();
1082 drop(updates_tx);
1083
1084 let SparseTrieTaskMessage::HashedState(received) = hashed_state_rx.recv().unwrap() else {
1085 panic!("expected HashedState message");
1086 };
1087
1088 let account = received.accounts.get(&address).unwrap().unwrap();
1089 assert_eq!(account.balance, expected_state.accounts[&address].unwrap().balance);
1090 assert_eq!(account.nonce, expected_state.accounts[&address].unwrap().nonce);
1091
1092 let storage = received.storages.get(&address).unwrap();
1093 assert_eq!(*storage.storage.get(&slot).unwrap(), value);
1094
1095 let second = hashed_state_rx.recv().unwrap();
1096 assert!(matches!(second, SparseTrieTaskMessage::FinishedStateUpdates));
1097
1098 assert!(hashed_state_rx.recv().is_err());
1099 handle.join().unwrap();
1100 }
1101}