1use crate::tree::{
4 cached_state::CachedStateProvider, payload_processor::bal::bal_to_hashed_post_state,
5};
6use alloy_eip7928::BlockAccessList;
7use alloy_evm::block::StateChangeSource;
8use alloy_primitives::{keccak256, map::HashSet, B256};
9use crossbeam_channel::{unbounded, Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
10use dashmap::DashMap;
11use derive_more::derive::Deref;
12use metrics::{Gauge, Histogram};
13use rayon::prelude::*;
14use reth_metrics::Metrics;
15use reth_provider::AccountReader;
16use reth_revm::state::EvmState;
17use reth_trie::{
18 added_removed_keys::MultiAddedRemovedKeys, DecodedMultiProof, HashedPostState, HashedStorage,
19 MultiProofTargets,
20};
21use reth_trie_parallel::{
22 proof::ParallelProof,
23 proof_task::{
24 AccountMultiproofInput, ProofResultContext, ProofResultMessage, ProofWorkerHandle,
25 },
26};
27use std::{collections::BTreeMap, mem, ops::DerefMut, sync::Arc, time::Instant};
28use tracing::{debug, error, instrument, trace};
29
30#[derive(Clone, Copy)]
32pub enum Source {
33 Evm(StateChangeSource),
35 BlockAccessList,
37}
38
39impl std::fmt::Debug for Source {
40 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
41 match self {
42 Self::Evm(source) => source.fmt(f),
43 Self::BlockAccessList => f.write_str("BlockAccessList"),
44 }
45 }
46}
47
48impl From<StateChangeSource> for Source {
49 fn from(source: StateChangeSource) -> Self {
50 Self::Evm(source)
51 }
52}
53
54const PREFETCH_MAX_BATCH_TARGETS: usize = 512;
58
59const PREFETCH_MAX_BATCH_MESSAGES: usize = 16;
62
63const STATE_UPDATE_MAX_BATCH_TARGETS: usize = 64;
67
68const STATE_UPDATE_BATCH_PREALLOC: usize = 16;
70
71const DEFAULT_MAX_TARGETS_FOR_CHUNKING: usize = 300;
74
75#[derive(Default, Debug)]
78pub struct SparseTrieUpdate {
79 pub(crate) state: HashedPostState,
81 pub(crate) multiproof: DecodedMultiProof,
83}
84
85impl SparseTrieUpdate {
86 pub(super) fn is_empty(&self) -> bool {
88 self.state.is_empty() && self.multiproof.is_empty()
89 }
90
91 #[cfg(test)]
93 pub(super) fn from_multiproof(multiproof: reth_trie::MultiProof) -> alloy_rlp::Result<Self> {
94 Ok(Self { multiproof: multiproof.try_into()?, ..Default::default() })
95 }
96
97 pub(super) fn extend(&mut self, other: Self) {
99 self.state.extend(other.state);
100 self.multiproof.extend(other.multiproof);
101 }
102}
103
104#[derive(Debug)]
106pub(super) enum MultiProofMessage {
107 PrefetchProofs(MultiProofTargets),
109 StateUpdate(Source, EvmState),
111 EmptyProof {
116 sequence_number: u64,
118 state: HashedPostState,
120 },
121 BlockAccessList(Arc<BlockAccessList>),
126 FinishedStateUpdates,
131}
132
133#[derive(Debug, Default)]
135struct ProofSequencer {
136 next_sequence: u64,
138 next_to_deliver: u64,
140 pending_proofs: BTreeMap<u64, SparseTrieUpdate>,
142}
143
144impl ProofSequencer {
145 const fn next_sequence(&mut self) -> u64 {
147 let seq = self.next_sequence;
148 self.next_sequence += 1;
149 seq
150 }
151
152 fn add_proof(&mut self, sequence: u64, update: SparseTrieUpdate) -> Vec<SparseTrieUpdate> {
155 if sequence >= self.next_to_deliver {
156 self.pending_proofs.insert(sequence, update);
157 }
158
159 if !self.pending_proofs.contains_key(&self.next_to_deliver) {
161 return Vec::new()
162 }
163
164 let mut consecutive_proofs = Vec::with_capacity(self.pending_proofs.len());
165 let mut current_sequence = self.next_to_deliver;
166
167 while let Some(pending) = self.pending_proofs.remove(¤t_sequence) {
169 consecutive_proofs.push(pending);
170 current_sequence += 1;
171 }
172
173 self.next_to_deliver += consecutive_proofs.len() as u64;
174
175 consecutive_proofs
176 }
177
178 pub(crate) fn has_pending(&self) -> bool {
180 !self.pending_proofs.is_empty()
181 }
182}
183
184#[derive(Deref, Debug)]
190pub(super) struct StateHookSender(CrossbeamSender<MultiProofMessage>);
191
192impl StateHookSender {
193 pub(crate) const fn new(inner: CrossbeamSender<MultiProofMessage>) -> Self {
194 Self(inner)
195 }
196}
197
198impl Drop for StateHookSender {
199 fn drop(&mut self) {
200 let _ = self.0.send(MultiProofMessage::FinishedStateUpdates);
202 }
203}
204
205pub(crate) fn evm_state_to_hashed_post_state(update: EvmState) -> HashedPostState {
206 update.into_par_iter()
207 .filter_map(|(address, account)| {
208 if !account.is_touched() {
209 return None;
210 }
211
212 let hashed_address = keccak256(address);
213 trace!(target: "engine::tree::payload_processor::multiproof", ?address, ?hashed_address, "Adding account to state update");
214
215 let destroyed = account.is_selfdestructed();
216 let info = if destroyed { None } else { Some(account.info.into()) };
217
218 let hashed_storage = if destroyed {
219 Some(HashedStorage::new(true))
220 } else {
221 let storage: Vec<_> = account
222 .storage
223 .into_iter()
224 .filter(|(_slot, value)| value.is_changed())
225 .map(|(slot, value)| (keccak256(B256::from(slot)), value.present_value))
226 .collect();
227
228 if storage.is_empty() {
229 None
230 } else {
231 Some(HashedStorage::from_iter(false, storage))
232 }
233 };
234
235 Some((hashed_address, info, hashed_storage))
236 })
237 .collect()
238}
239
240#[derive(Debug)]
242struct MultiproofInput {
243 source: Option<Source>,
244 hashed_state_update: HashedPostState,
245 proof_targets: MultiProofTargets,
246 proof_sequence_number: u64,
247 state_root_message_sender: CrossbeamSender<MultiProofMessage>,
248 multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
249}
250
251impl MultiproofInput {
252 fn send_empty_proof(self) {
254 let _ = self.state_root_message_sender.send(MultiProofMessage::EmptyProof {
255 sequence_number: self.proof_sequence_number,
256 state: self.hashed_state_update,
257 });
258 }
259}
260
261#[derive(Debug)]
272pub struct MultiproofManager {
273 proof_worker_handle: ProofWorkerHandle,
275 missed_leaves_storage_roots: Arc<DashMap<B256, B256>>,
287 proof_result_tx: CrossbeamSender<ProofResultMessage>,
290 metrics: MultiProofTaskMetrics,
292}
293
294impl MultiproofManager {
295 fn new(
297 metrics: MultiProofTaskMetrics,
298 proof_worker_handle: ProofWorkerHandle,
299 proof_result_tx: CrossbeamSender<ProofResultMessage>,
300 ) -> Self {
301 metrics.max_storage_workers.set(proof_worker_handle.total_storage_workers() as f64);
303 metrics.max_account_workers.set(proof_worker_handle.total_account_workers() as f64);
304
305 Self {
306 metrics,
307 proof_worker_handle,
308 missed_leaves_storage_roots: Default::default(),
309 proof_result_tx,
310 }
311 }
312
313 fn dispatch(&self, input: MultiproofInput) {
315 if input.proof_targets.is_empty() {
317 trace!(
318 sequence_number = input.proof_sequence_number,
319 "No proof targets, sending empty multiproof back immediately"
320 );
321 input.send_empty_proof();
322 return;
323 }
324
325 self.dispatch_multiproof(input);
326 }
327
328 fn on_calculation_complete(&self) {
330 self.metrics
331 .active_storage_workers_histogram
332 .record(self.proof_worker_handle.active_storage_workers() as f64);
333 self.metrics
334 .active_account_workers_histogram
335 .record(self.proof_worker_handle.active_account_workers() as f64);
336 self.metrics
337 .pending_storage_multiproofs_histogram
338 .record(self.proof_worker_handle.pending_storage_tasks() as f64);
339 self.metrics
340 .pending_account_multiproofs_histogram
341 .record(self.proof_worker_handle.pending_account_tasks() as f64);
342 }
343
344 fn dispatch_multiproof(&self, multiproof_input: MultiproofInput) {
346 let MultiproofInput {
347 source,
348 hashed_state_update,
349 proof_targets,
350 proof_sequence_number,
351 state_root_message_sender: _,
352 multi_added_removed_keys,
353 } = multiproof_input;
354
355 let missed_leaves_storage_roots = self.missed_leaves_storage_roots.clone();
356 let account_targets = proof_targets.len();
357 let storage_targets = proof_targets.values().map(|slots| slots.len()).sum::<usize>();
358
359 trace!(
360 target: "engine::tree::payload_processor::multiproof",
361 proof_sequence_number,
362 ?proof_targets,
363 account_targets,
364 storage_targets,
365 ?source,
366 "Dispatching multiproof to workers"
367 );
368
369 let start = Instant::now();
370
371 let frozen_prefix_sets =
373 ParallelProof::extend_prefix_sets_with_targets(&Default::default(), &proof_targets);
374
375 let input = AccountMultiproofInput {
377 targets: proof_targets,
378 prefix_sets: frozen_prefix_sets,
379 collect_branch_node_masks: true,
380 multi_added_removed_keys,
381 missed_leaves_storage_roots,
382 proof_result_sender: ProofResultContext::new(
384 self.proof_result_tx.clone(),
385 proof_sequence_number,
386 hashed_state_update,
387 start,
388 ),
389 };
390
391 if let Err(e) = self.proof_worker_handle.dispatch_account_multiproof(input) {
392 error!(target: "engine::tree::payload_processor::multiproof", ?e, "Failed to dispatch account multiproof");
393 return;
394 }
395
396 self.metrics
397 .active_storage_workers_histogram
398 .record(self.proof_worker_handle.active_storage_workers() as f64);
399 self.metrics
400 .active_account_workers_histogram
401 .record(self.proof_worker_handle.active_account_workers() as f64);
402 self.metrics
403 .pending_storage_multiproofs_histogram
404 .record(self.proof_worker_handle.pending_storage_tasks() as f64);
405 self.metrics
406 .pending_account_multiproofs_histogram
407 .record(self.proof_worker_handle.pending_account_tasks() as f64);
408 }
409}
410
411#[derive(Metrics, Clone)]
412#[metrics(scope = "tree.root")]
413pub(crate) struct MultiProofTaskMetrics {
414 pub active_storage_workers_histogram: Histogram,
416 pub active_account_workers_histogram: Histogram,
418 pub max_storage_workers: Gauge,
420 pub max_account_workers: Gauge,
422 pub pending_storage_multiproofs_histogram: Histogram,
424 pub pending_account_multiproofs_histogram: Histogram,
426
427 pub prefetch_proof_targets_accounts_histogram: Histogram,
429 pub prefetch_proof_targets_storages_histogram: Histogram,
431 pub prefetch_proof_chunks_histogram: Histogram,
433
434 pub state_update_proof_targets_accounts_histogram: Histogram,
436 pub state_update_proof_targets_storages_histogram: Histogram,
438 pub state_update_proof_chunks_histogram: Histogram,
440
441 pub prefetch_batch_size_histogram: Histogram,
443 pub state_update_batch_size_histogram: Histogram,
445
446 pub proof_calculation_duration_histogram: Histogram,
448
449 pub sparse_trie_update_duration_histogram: Histogram,
451 pub sparse_trie_final_update_duration_histogram: Histogram,
453 pub sparse_trie_total_duration_histogram: Histogram,
455
456 pub state_updates_received_histogram: Histogram,
458 pub proofs_processed_histogram: Histogram,
460 pub multiproof_task_total_duration_histogram: Histogram,
462 pub first_update_wait_time_histogram: Histogram,
464 pub last_proof_wait_time_histogram: Histogram,
466}
467
468#[derive(Debug)]
568pub(super) struct MultiProofTask {
569 chunk_size: Option<usize>,
572 rx: CrossbeamReceiver<MultiProofMessage>,
574 tx: CrossbeamSender<MultiProofMessage>,
576 proof_result_rx: CrossbeamReceiver<ProofResultMessage>,
578 to_sparse_trie: std::sync::mpsc::Sender<SparseTrieUpdate>,
580 fetched_proof_targets: MultiProofTargets,
582 multi_added_removed_keys: MultiAddedRemovedKeys,
584 proof_sequencer: ProofSequencer,
586 multiproof_manager: MultiproofManager,
588 metrics: MultiProofTaskMetrics,
590 max_targets_for_chunking: usize,
594}
595
596impl MultiProofTask {
597 pub(super) fn new(
600 proof_worker_handle: ProofWorkerHandle,
601 to_sparse_trie: std::sync::mpsc::Sender<SparseTrieUpdate>,
602 chunk_size: Option<usize>,
603 tx: CrossbeamSender<MultiProofMessage>,
604 rx: CrossbeamReceiver<MultiProofMessage>,
605 ) -> Self {
606 let (proof_result_tx, proof_result_rx) = unbounded();
607 let metrics = MultiProofTaskMetrics::default();
608
609 Self {
610 chunk_size,
611 rx,
612 tx,
613 proof_result_rx,
614 to_sparse_trie,
615 fetched_proof_targets: Default::default(),
616 multi_added_removed_keys: MultiAddedRemovedKeys::new(),
617 proof_sequencer: ProofSequencer::default(),
618 multiproof_manager: MultiproofManager::new(
619 metrics.clone(),
620 proof_worker_handle,
621 proof_result_tx,
622 ),
623 metrics,
624 max_targets_for_chunking: DEFAULT_MAX_TARGETS_FOR_CHUNKING,
625 }
626 }
627
628 pub(super) fn state_root_message_sender(&self) -> CrossbeamSender<MultiProofMessage> {
630 self.tx.clone()
631 }
632
633 #[instrument(
637 level = "debug",
638 target = "engine::tree::payload_processor::multiproof",
639 skip_all,
640 fields(accounts = targets.len(), chunks = 0)
641 )]
642 fn on_prefetch_proof(&mut self, targets: MultiProofTargets) -> u64 {
643 let proof_targets = self.get_prefetch_proof_targets(targets);
644 self.fetched_proof_targets.extend_ref(&proof_targets);
645
646 self.multi_added_removed_keys.touch_accounts(proof_targets.keys().copied());
650
651 let multi_added_removed_keys = Arc::new(self.multi_added_removed_keys.clone());
653
654 self.metrics.prefetch_proof_targets_accounts_histogram.record(proof_targets.len() as f64);
655 self.metrics
656 .prefetch_proof_targets_storages_histogram
657 .record(proof_targets.values().map(|slots| slots.len()).sum::<usize>() as f64);
658
659 let chunking_len = proof_targets.chunking_length();
660 let available_account_workers =
661 self.multiproof_manager.proof_worker_handle.available_account_workers();
662 let available_storage_workers =
663 self.multiproof_manager.proof_worker_handle.available_storage_workers();
664 let num_chunks = dispatch_with_chunking(
665 proof_targets,
666 chunking_len,
667 self.chunk_size,
668 self.max_targets_for_chunking,
669 available_account_workers,
670 available_storage_workers,
671 MultiProofTargets::chunks,
672 |proof_targets| {
673 self.multiproof_manager.dispatch(MultiproofInput {
674 source: None,
675 hashed_state_update: Default::default(),
676 proof_targets,
677 proof_sequence_number: self.proof_sequencer.next_sequence(),
678 state_root_message_sender: self.tx.clone(),
679 multi_added_removed_keys: Some(multi_added_removed_keys.clone()),
680 });
681 },
682 );
683 self.metrics.prefetch_proof_chunks_histogram.record(num_chunks as f64);
684
685 num_chunks as u64
686 }
687
688 fn is_done(
690 &self,
691 proofs_processed: u64,
692 state_update_proofs_requested: u64,
693 prefetch_proofs_requested: u64,
694 updates_finished: bool,
695 ) -> bool {
696 let all_proofs_processed =
697 proofs_processed >= state_update_proofs_requested + prefetch_proofs_requested;
698 let no_pending = !self.proof_sequencer.has_pending();
699 trace!(
700 target: "engine::tree::payload_processor::multiproof",
701 proofs_processed,
702 state_update_proofs_requested,
703 prefetch_proofs_requested,
704 no_pending,
705 updates_finished,
706 "Checking end condition"
707 );
708 all_proofs_processed && no_pending && updates_finished
709 }
710
711 fn get_prefetch_proof_targets(&self, mut targets: MultiProofTargets) -> MultiProofTargets {
713 let mut duplicates = 0;
717
718 targets.retain(|hashed_address, target_storage| {
720 let keep = self
721 .fetched_proof_targets
722 .get(hashed_address)
723 .is_none_or(|fetched_storage| {
725 !target_storage.is_subset(fetched_storage)
727 });
728
729 if !keep {
730 duplicates += target_storage.len();
731 }
732
733 keep
734 });
735
736 for (hashed_address, target_storage) in targets.deref_mut() {
738 let Some(fetched_storage) = self.fetched_proof_targets.get(hashed_address) else {
739 continue;
742 };
743
744 let prev_target_storage_len = target_storage.len();
745
746 target_storage.retain(|slot| !fetched_storage.contains(slot));
750
751 duplicates += prev_target_storage_len - target_storage.len();
752 }
753
754 if duplicates > 0 {
755 trace!(target: "engine::tree::payload_processor::multiproof", duplicates, "Removed duplicate prefetch proof targets");
756 }
757
758 targets
759 }
760
761 #[instrument(
766 level = "debug",
767 target = "engine::tree::payload_processor::multiproof",
768 skip(self, update),
769 fields(accounts = update.len(), chunks = 0)
770 )]
771 fn on_state_update(&mut self, source: Source, update: EvmState) -> u64 {
772 let hashed_state_update = evm_state_to_hashed_post_state(update);
773 self.on_hashed_state_update(source, hashed_state_update)
774 }
775
776 fn on_hashed_state_update(
780 &mut self,
781 source: Source,
782 hashed_state_update: HashedPostState,
783 ) -> u64 {
784 self.multi_added_removed_keys.update_with_state(&hashed_state_update);
786
787 let (fetched_state_update, not_fetched_state_update) = hashed_state_update
790 .partition_by_targets(&self.fetched_proof_targets, &self.multi_added_removed_keys);
791
792 let mut state_updates = 0;
793 if !fetched_state_update.is_empty() {
796 let _ = self.tx.send(MultiProofMessage::EmptyProof {
797 sequence_number: self.proof_sequencer.next_sequence(),
798 state: fetched_state_update,
799 });
800 state_updates += 1;
801 }
802
803 let multi_added_removed_keys = Arc::new(self.multi_added_removed_keys.clone());
805
806 let chunking_len = not_fetched_state_update.chunking_length();
807 let mut spawned_proof_targets = MultiProofTargets::default();
808 let available_account_workers =
809 self.multiproof_manager.proof_worker_handle.available_account_workers();
810 let available_storage_workers =
811 self.multiproof_manager.proof_worker_handle.available_storage_workers();
812 let num_chunks = dispatch_with_chunking(
813 not_fetched_state_update,
814 chunking_len,
815 self.chunk_size,
816 self.max_targets_for_chunking,
817 available_account_workers,
818 available_storage_workers,
819 HashedPostState::chunks,
820 |hashed_state_update| {
821 let proof_targets = get_proof_targets(
822 &hashed_state_update,
823 &self.fetched_proof_targets,
824 &multi_added_removed_keys,
825 );
826 spawned_proof_targets.extend_ref(&proof_targets);
827
828 self.multiproof_manager.dispatch(MultiproofInput {
829 source: Some(source),
830 hashed_state_update,
831 proof_targets,
832 proof_sequence_number: self.proof_sequencer.next_sequence(),
833 state_root_message_sender: self.tx.clone(),
834 multi_added_removed_keys: Some(multi_added_removed_keys.clone()),
835 });
836 },
837 );
838 self.metrics
839 .state_update_proof_targets_accounts_histogram
840 .record(spawned_proof_targets.len() as f64);
841 self.metrics
842 .state_update_proof_targets_storages_histogram
843 .record(spawned_proof_targets.values().map(|slots| slots.len()).sum::<usize>() as f64);
844 self.metrics.state_update_proof_chunks_histogram.record(num_chunks as f64);
845
846 self.fetched_proof_targets.extend(spawned_proof_targets);
847
848 state_updates + num_chunks as u64
849 }
850
851 fn on_proof(
853 &mut self,
854 sequence_number: u64,
855 update: SparseTrieUpdate,
856 ) -> Option<SparseTrieUpdate> {
857 let ready_proofs = self.proof_sequencer.add_proof(sequence_number, update);
858
859 ready_proofs
860 .into_iter()
861 .reduce(|mut acc_update, update| {
863 acc_update.extend(update);
864 acc_update
865 })
866 .filter(|proof| !proof.is_empty())
868 }
869
870 fn process_multiproof_message<P>(
878 &mut self,
879 msg: MultiProofMessage,
880 ctx: &mut MultiproofBatchCtx,
881 batch_metrics: &mut MultiproofBatchMetrics,
882 provider: &CachedStateProvider<P>,
883 ) -> bool
884 where
885 P: AccountReader,
886 {
887 match msg {
888 MultiProofMessage::PrefetchProofs(targets) => {
890 trace!(target: "engine::tree::payload_processor::multiproof", "processing MultiProofMessage::PrefetchProofs");
891
892 if ctx.first_update_time.is_none() {
893 self.metrics
894 .first_update_wait_time_histogram
895 .record(ctx.start.elapsed().as_secs_f64());
896 ctx.first_update_time = Some(Instant::now());
897 debug!(target: "engine::tree::payload_processor::multiproof", "Started state root calculation");
898 }
899
900 let mut accumulated_count = targets.chunking_length();
901 ctx.accumulated_prefetch_targets.clear();
902 ctx.accumulated_prefetch_targets.push(targets);
903
904 while accumulated_count < PREFETCH_MAX_BATCH_TARGETS &&
906 ctx.accumulated_prefetch_targets.len() < PREFETCH_MAX_BATCH_MESSAGES
907 {
908 match self.rx.try_recv() {
909 Ok(MultiProofMessage::PrefetchProofs(next_targets)) => {
910 let next_count = next_targets.chunking_length();
911 if accumulated_count + next_count > PREFETCH_MAX_BATCH_TARGETS {
912 ctx.pending_msg =
913 Some(MultiProofMessage::PrefetchProofs(next_targets));
914 break;
915 }
916 accumulated_count += next_count;
917 ctx.accumulated_prefetch_targets.push(next_targets);
918 }
919 Ok(other_msg) => {
920 ctx.pending_msg = Some(other_msg);
921 break;
922 }
923 Err(_) => break,
924 }
925 }
926
927 let num_batched = ctx.accumulated_prefetch_targets.len();
929 self.metrics.prefetch_batch_size_histogram.record(num_batched as f64);
930
931 let mut accumulated_iter = ctx.accumulated_prefetch_targets.drain(..);
934 let mut merged_targets =
935 accumulated_iter.next().expect("prefetch batch always has at least one entry");
936 for next_targets in accumulated_iter {
937 merged_targets.extend(next_targets);
938 }
939
940 let account_targets = merged_targets.len();
941 let storage_targets =
942 merged_targets.values().map(|slots| slots.len()).sum::<usize>();
943 batch_metrics.prefetch_proofs_requested += self.on_prefetch_proof(merged_targets);
944 trace!(
945 target: "engine::tree::payload_processor::multiproof",
946 account_targets,
947 storage_targets,
948 prefetch_proofs_requested = batch_metrics.prefetch_proofs_requested,
949 num_batched,
950 "Dispatched prefetch batch"
951 );
952
953 false
954 }
955 MultiProofMessage::StateUpdate(source, update) => {
957 trace!(target: "engine::tree::payload_processor::multiproof", "processing MultiProofMessage::StateUpdate");
958
959 if ctx.first_update_time.is_none() {
960 self.metrics
961 .first_update_wait_time_histogram
962 .record(ctx.start.elapsed().as_secs_f64());
963 ctx.first_update_time = Some(Instant::now());
964 debug!(target: "engine::tree::payload_processor::multiproof", "Started state root calculation");
965 }
966
967 let mut accumulated_targets = estimate_evm_state_targets(&update);
969 ctx.accumulated_state_updates.clear();
970 ctx.accumulated_state_updates.push((source, update));
971
972 while accumulated_targets < STATE_UPDATE_MAX_BATCH_TARGETS {
974 match self.rx.try_recv() {
975 Ok(MultiProofMessage::StateUpdate(next_source, next_update)) => {
976 let (batch_source, batch_update) = &ctx.accumulated_state_updates[0];
977 if !can_batch_state_update(
978 *batch_source,
979 batch_update,
980 next_source,
981 &next_update,
982 ) {
983 ctx.pending_msg =
984 Some(MultiProofMessage::StateUpdate(next_source, next_update));
985 break;
986 }
987
988 let next_estimate = estimate_evm_state_targets(&next_update);
989 if accumulated_targets + next_estimate > STATE_UPDATE_MAX_BATCH_TARGETS
991 {
992 ctx.pending_msg =
993 Some(MultiProofMessage::StateUpdate(next_source, next_update));
994 break;
995 }
996 accumulated_targets += next_estimate;
997 ctx.accumulated_state_updates.push((next_source, next_update));
998 }
999 Ok(other_msg) => {
1000 ctx.pending_msg = Some(other_msg);
1001 break;
1002 }
1003 Err(_) => break,
1004 }
1005 }
1006
1007 let num_batched = ctx.accumulated_state_updates.len();
1009 self.metrics.state_update_batch_size_histogram.record(num_batched as f64);
1010
1011 #[cfg(debug_assertions)]
1012 {
1013 let batch_source = ctx.accumulated_state_updates[0].0;
1014 let batch_update = &ctx.accumulated_state_updates[0].1;
1015 debug_assert!(ctx.accumulated_state_updates.iter().all(|(source, update)| {
1016 can_batch_state_update(batch_source, batch_update, *source, update)
1017 }));
1018 }
1019
1020 let mut accumulated_iter = ctx.accumulated_state_updates.drain(..);
1023 let (mut batch_source, mut merged_update) = accumulated_iter
1024 .next()
1025 .expect("state update batch always has at least one entry");
1026 for (next_source, next_update) in accumulated_iter {
1027 batch_source = next_source;
1028 merged_update.extend(next_update);
1029 }
1030
1031 let batch_len = merged_update.len();
1032 batch_metrics.state_update_proofs_requested +=
1033 self.on_state_update(batch_source, merged_update);
1034 trace!(
1035 target: "engine::tree::payload_processor::multiproof",
1036 ?batch_source,
1037 len = batch_len,
1038 state_update_proofs_requested = ?batch_metrics.state_update_proofs_requested,
1039 num_batched,
1040 "Dispatched state update batch"
1041 );
1042
1043 false
1044 }
1045 MultiProofMessage::BlockAccessList(bal) => {
1047 trace!(target: "engine::tree::payload_processor::multiproof", "processing MultiProofMessage::BAL");
1048
1049 if ctx.first_update_time.is_none() {
1050 self.metrics
1051 .first_update_wait_time_histogram
1052 .record(ctx.start.elapsed().as_secs_f64());
1053 ctx.first_update_time = Some(Instant::now());
1054 debug!(target: "engine::tree::payload_processor::multiproof", "Started state root calculation from BAL");
1055 }
1056
1057 match bal_to_hashed_post_state(&bal, provider) {
1059 Ok(hashed_state) => {
1060 debug!(
1061 target: "engine::tree::payload_processor::multiproof",
1062 accounts = hashed_state.accounts.len(),
1063 storages = hashed_state.storages.len(),
1064 "Processing BAL state update"
1065 );
1066
1067 batch_metrics.state_update_proofs_requested +=
1069 self.on_hashed_state_update(Source::BlockAccessList, hashed_state);
1070 }
1071 Err(err) => {
1072 error!(target: "engine::tree::payload_processor::multiproof", ?err, "Failed to convert BAL to hashed state");
1073 return true;
1074 }
1075 }
1076
1077 ctx.updates_finished_time = Some(Instant::now());
1079
1080 if self.is_done(
1082 batch_metrics.proofs_processed,
1083 batch_metrics.state_update_proofs_requested,
1084 batch_metrics.prefetch_proofs_requested,
1085 ctx.updates_finished(),
1086 ) {
1087 debug!(
1088 target: "engine::tree::payload_processor::multiproof",
1089 "BAL processed and all proofs complete, ending calculation"
1090 );
1091 return true;
1092 }
1093 false
1094 }
1095 MultiProofMessage::FinishedStateUpdates => {
1097 trace!(target: "engine::tree::payload_processor::multiproof", "processing MultiProofMessage::FinishedStateUpdates");
1098
1099 ctx.updates_finished_time = Some(Instant::now());
1100
1101 if self.is_done(
1102 batch_metrics.proofs_processed,
1103 batch_metrics.state_update_proofs_requested,
1104 batch_metrics.prefetch_proofs_requested,
1105 ctx.updates_finished(),
1106 ) {
1107 debug!(
1108 target: "engine::tree::payload_processor::multiproof",
1109 "State updates finished and all proofs processed, ending calculation"
1110 );
1111 return true;
1112 }
1113 false
1114 }
1115 MultiProofMessage::EmptyProof { sequence_number, state } => {
1117 trace!(target: "engine::tree::payload_processor::multiproof", "processing MultiProofMessage::EmptyProof");
1118
1119 batch_metrics.proofs_processed += 1;
1120
1121 if let Some(combined_update) = self.on_proof(
1122 sequence_number,
1123 SparseTrieUpdate { state, multiproof: Default::default() },
1124 ) {
1125 let _ = self.to_sparse_trie.send(combined_update);
1126 }
1127
1128 if self.is_done(
1129 batch_metrics.proofs_processed,
1130 batch_metrics.state_update_proofs_requested,
1131 batch_metrics.prefetch_proofs_requested,
1132 ctx.updates_finished(),
1133 ) {
1134 debug!(
1135 target: "engine::tree::payload_processor::multiproof",
1136 "State updates finished and all proofs processed, ending calculation"
1137 );
1138 return true;
1139 }
1140 false
1141 }
1142 }
1143 }
1144
1145 #[instrument(
1182 level = "debug",
1183 name = "MultiProofTask::run",
1184 target = "engine::tree::payload_processor::multiproof",
1185 skip_all
1186 )]
1187 pub(crate) fn run<P>(mut self, provider: CachedStateProvider<P>)
1188 where
1189 P: AccountReader,
1190 {
1191 let mut ctx = MultiproofBatchCtx::new(Instant::now());
1192 let mut batch_metrics = MultiproofBatchMetrics::default();
1193
1194 'main: loop {
1197 trace!(target: "engine::tree::payload_processor::multiproof", "entering main channel receiving loop");
1198
1199 if let Some(msg) = ctx.pending_msg.take() {
1200 if self.process_multiproof_message(msg, &mut ctx, &mut batch_metrics, &provider) {
1201 break 'main;
1202 }
1203 continue;
1204 }
1205
1206 crossbeam_channel::select_biased! {
1209 recv(self.proof_result_rx) -> proof_msg => {
1210 match proof_msg {
1211 Ok(proof_result) => {
1212 batch_metrics.proofs_processed += 1;
1213
1214 self.metrics
1215 .proof_calculation_duration_histogram
1216 .record(proof_result.elapsed);
1217
1218 self.multiproof_manager.on_calculation_complete();
1219
1220 match proof_result.result {
1222 Ok(proof_result_data) => {
1223 trace!(
1224 target: "engine::tree::payload_processor::multiproof",
1225 sequence = proof_result.sequence_number,
1226 total_proofs = batch_metrics.proofs_processed,
1227 "Processing calculated proof from worker"
1228 );
1229
1230 let update = SparseTrieUpdate {
1231 state: proof_result.state,
1232 multiproof: proof_result_data.into_multiproof(),
1233 };
1234
1235 if let Some(combined_update) =
1236 self.on_proof(proof_result.sequence_number, update)
1237 {
1238 let _ = self.to_sparse_trie.send(combined_update);
1239 }
1240 }
1241 Err(error) => {
1242 error!(target: "engine::tree::payload_processor::multiproof", ?error, "proof calculation error from worker");
1243 return
1244 }
1245 }
1246
1247 if self.is_done(
1248 batch_metrics.proofs_processed,
1249 batch_metrics.state_update_proofs_requested,
1250 batch_metrics.prefetch_proofs_requested,
1251 ctx.updates_finished(),
1252 ) {
1253 debug!(
1254 target: "engine::tree::payload_processor::multiproof",
1255 "State updates finished and all proofs processed, ending calculation"
1256 );
1257 break 'main
1258 }
1259 }
1260 Err(_) => {
1261 error!(target: "engine::tree::payload_processor::multiproof", "Proof result channel closed unexpectedly");
1262 return
1263 }
1264 }
1265 },
1266 recv(self.rx) -> message => {
1267 let msg = match message {
1268 Ok(m) => m,
1269 Err(_) => {
1270 error!(target: "engine::tree::payload_processor::multiproof", "State root related message channel closed unexpectedly");
1271 return
1272 }
1273 };
1274
1275 if self.process_multiproof_message(msg, &mut ctx, &mut batch_metrics, &provider) {
1276 break 'main;
1277 }
1278 }
1279 }
1280 }
1281
1282 debug!(
1283 target: "engine::tree::payload_processor::multiproof",
1284 total_updates = batch_metrics.state_update_proofs_requested,
1285 total_proofs = batch_metrics.proofs_processed,
1286 total_time = ?ctx.first_update_time.map(|t|t.elapsed()),
1287 time_since_updates_finished = ?ctx.updates_finished_time.map(|t|t.elapsed()),
1288 "All proofs processed, ending calculation"
1289 );
1290
1291 self.metrics
1293 .state_updates_received_histogram
1294 .record(batch_metrics.state_update_proofs_requested as f64);
1295 self.metrics.proofs_processed_histogram.record(batch_metrics.proofs_processed as f64);
1296 if let Some(total_time) = ctx.first_update_time.map(|t| t.elapsed()) {
1297 self.metrics.multiproof_task_total_duration_histogram.record(total_time);
1298 }
1299
1300 if let Some(updates_finished_time) = ctx.updates_finished_time {
1301 self.metrics
1302 .last_proof_wait_time_histogram
1303 .record(updates_finished_time.elapsed().as_secs_f64());
1304 }
1305 }
1306}
1307
1308struct MultiproofBatchCtx {
1315 pending_msg: Option<MultiProofMessage>,
1319 first_update_time: Option<Instant>,
1321 start: Instant,
1323 updates_finished_time: Option<Instant>,
1326 accumulated_prefetch_targets: Vec<MultiProofTargets>,
1328 accumulated_state_updates: Vec<(Source, EvmState)>,
1330}
1331
1332impl MultiproofBatchCtx {
1333 fn new(start: Instant) -> Self {
1335 Self {
1336 pending_msg: None,
1337 first_update_time: None,
1338 start,
1339 updates_finished_time: None,
1340 accumulated_prefetch_targets: Vec::with_capacity(PREFETCH_MAX_BATCH_MESSAGES),
1341 accumulated_state_updates: Vec::with_capacity(STATE_UPDATE_BATCH_PREALLOC),
1342 }
1343 }
1344
1345 const fn updates_finished(&self) -> bool {
1347 self.updates_finished_time.is_some()
1348 }
1349}
1350
1351#[derive(Default)]
1353struct MultiproofBatchMetrics {
1354 proofs_processed: u64,
1356 state_update_proofs_requested: u64,
1358 prefetch_proofs_requested: u64,
1360}
1361
1362fn get_proof_targets(
1366 state_update: &HashedPostState,
1367 fetched_proof_targets: &MultiProofTargets,
1368 multi_added_removed_keys: &MultiAddedRemovedKeys,
1369) -> MultiProofTargets {
1370 let mut targets = MultiProofTargets::default();
1371
1372 for &hashed_address in state_update.accounts.keys() {
1374 if !fetched_proof_targets.contains_key(&hashed_address) {
1375 targets.insert(hashed_address, HashSet::default());
1376 }
1377 }
1378
1379 for (hashed_address, storage) in &state_update.storages {
1381 let fetched = fetched_proof_targets.get(hashed_address);
1382 let storage_added_removed_keys = multi_added_removed_keys.get_storage(hashed_address);
1383 let mut changed_slots = storage
1384 .storage
1385 .keys()
1386 .filter(|slot| {
1387 !fetched.is_some_and(|f| f.contains(*slot)) ||
1388 storage_added_removed_keys.is_some_and(|k| k.is_removed(slot))
1389 })
1390 .peekable();
1391
1392 if storage.wiped && fetched.is_none() {
1394 targets.entry(*hashed_address).or_default();
1395 }
1396
1397 if changed_slots.peek().is_some() {
1398 targets.entry(*hashed_address).or_default().extend(changed_slots);
1399 }
1400 }
1401
1402 targets
1403}
1404
1405#[allow(clippy::too_many_arguments)]
1408fn dispatch_with_chunking<T, I>(
1409 items: T,
1410 chunking_len: usize,
1411 chunk_size: Option<usize>,
1412 max_targets_for_chunking: usize,
1413 available_account_workers: usize,
1414 available_storage_workers: usize,
1415 chunker: impl FnOnce(T, usize) -> I,
1416 mut dispatch: impl FnMut(T),
1417) -> usize
1418where
1419 I: IntoIterator<Item = T>,
1420{
1421 let should_chunk = chunking_len > max_targets_for_chunking ||
1422 available_account_workers > 1 ||
1423 available_storage_workers > 1;
1424
1425 if should_chunk &&
1426 let Some(chunk_size) = chunk_size &&
1427 chunking_len > chunk_size
1428 {
1429 let mut num_chunks = 0usize;
1430 for chunk in chunker(items, chunk_size) {
1431 dispatch(chunk);
1432 num_chunks += 1;
1433 }
1434 return num_chunks;
1435 }
1436
1437 dispatch(items);
1438 1
1439}
1440
1441fn can_batch_state_update(
1447 batch_source: Source,
1448 batch_update: &EvmState,
1449 next_source: Source,
1450 next_update: &EvmState,
1451) -> bool {
1452 if !same_source(batch_source, next_source) {
1453 return false;
1454 }
1455
1456 match (batch_source, next_source) {
1457 (
1458 Source::Evm(StateChangeSource::PreBlock(_)),
1459 Source::Evm(StateChangeSource::PreBlock(_)),
1460 ) |
1461 (
1462 Source::Evm(StateChangeSource::PostBlock(_)),
1463 Source::Evm(StateChangeSource::PostBlock(_)),
1464 ) => batch_update == next_update,
1465 _ => true,
1466 }
1467}
1468
1469fn same_source(lhs: Source, rhs: Source) -> bool {
1471 match (lhs, rhs) {
1472 (
1473 Source::Evm(StateChangeSource::Transaction(a)),
1474 Source::Evm(StateChangeSource::Transaction(b)),
1475 ) => a == b,
1476 (
1477 Source::Evm(StateChangeSource::PreBlock(a)),
1478 Source::Evm(StateChangeSource::PreBlock(b)),
1479 ) => mem::discriminant(&a) == mem::discriminant(&b),
1480 (
1481 Source::Evm(StateChangeSource::PostBlock(a)),
1482 Source::Evm(StateChangeSource::PostBlock(b)),
1483 ) => mem::discriminant(&a) == mem::discriminant(&b),
1484 (Source::BlockAccessList, Source::BlockAccessList) => true,
1485 _ => false,
1486 }
1487}
1488
1489fn estimate_evm_state_targets(state: &EvmState) -> usize {
1491 state
1492 .values()
1493 .filter(|account| account.is_touched())
1494 .map(|account| {
1495 let changed_slots = account.storage.iter().filter(|(_, v)| v.is_changed()).count();
1496 1 + changed_slots
1497 })
1498 .sum()
1499}
1500
1501#[cfg(test)]
1502mod tests {
1503 use super::*;
1504 use crate::tree::cached_state::ExecutionCacheBuilder;
1505 use alloy_eip7928::{AccountChanges, BalanceChange};
1506 use alloy_primitives::{map::B256Set, Address};
1507 use reth_provider::{
1508 providers::OverlayStateProviderFactory, test_utils::create_test_provider_factory,
1509 BlockReader, DatabaseProviderFactory, LatestStateProvider, PruneCheckpointReader,
1510 StageCheckpointReader, StateProviderBox, TrieReader,
1511 };
1512 use reth_trie::MultiProof;
1513 use reth_trie_parallel::proof_task::{ProofTaskCtx, ProofWorkerHandle};
1514 use revm_primitives::{B256, U256};
1515 use std::sync::{Arc, OnceLock};
1516 use tokio::runtime::{Handle, Runtime};
1517
1518 fn get_test_runtime_handle() -> Handle {
1520 static TEST_RT: OnceLock<Runtime> = OnceLock::new();
1521 TEST_RT
1522 .get_or_init(|| {
1523 tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap()
1524 })
1525 .handle()
1526 .clone()
1527 }
1528
1529 fn create_test_state_root_task<F>(factory: F) -> MultiProofTask
1530 where
1531 F: DatabaseProviderFactory<
1532 Provider: BlockReader + TrieReader + StageCheckpointReader + PruneCheckpointReader,
1533 > + Clone
1534 + Send
1535 + 'static,
1536 {
1537 let rt_handle = get_test_runtime_handle();
1538 let overlay_factory = OverlayStateProviderFactory::new(factory);
1539 let task_ctx = ProofTaskCtx::new(overlay_factory);
1540 let proof_handle = ProofWorkerHandle::new(rt_handle, task_ctx, 1, 1);
1541 let (to_sparse_trie, _receiver) = std::sync::mpsc::channel();
1542 let (tx, rx) = crossbeam_channel::unbounded();
1543
1544 MultiProofTask::new(proof_handle, to_sparse_trie, Some(1), tx, rx)
1545 }
1546
1547 fn create_cached_provider<F>(factory: F) -> CachedStateProvider<StateProviderBox>
1548 where
1549 F: DatabaseProviderFactory<
1550 Provider: BlockReader + TrieReader + StageCheckpointReader + PruneCheckpointReader,
1551 > + Clone
1552 + Send
1553 + 'static,
1554 {
1555 let db_provider = factory.database_provider_ro().unwrap();
1556 let state_provider: StateProviderBox = Box::new(LatestStateProvider::new(db_provider));
1557 let cache = ExecutionCacheBuilder::default().build_caches(1000);
1558 CachedStateProvider::new(state_provider, cache, Default::default())
1559 }
1560
1561 #[test]
1562 fn test_add_proof_in_sequence() {
1563 let mut sequencer = ProofSequencer::default();
1564 let proof1 = MultiProof::default();
1565 let proof2 = MultiProof::default();
1566 sequencer.next_sequence = 2;
1567
1568 let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1).unwrap());
1569 assert_eq!(ready.len(), 1);
1570 assert!(!sequencer.has_pending());
1571
1572 let ready = sequencer.add_proof(1, SparseTrieUpdate::from_multiproof(proof2).unwrap());
1573 assert_eq!(ready.len(), 1);
1574 assert!(!sequencer.has_pending());
1575 }
1576
1577 #[test]
1578 fn test_add_proof_out_of_order() {
1579 let mut sequencer = ProofSequencer::default();
1580 let proof1 = MultiProof::default();
1581 let proof2 = MultiProof::default();
1582 let proof3 = MultiProof::default();
1583 sequencer.next_sequence = 3;
1584
1585 let ready = sequencer.add_proof(2, SparseTrieUpdate::from_multiproof(proof3).unwrap());
1586 assert_eq!(ready.len(), 0);
1587 assert!(sequencer.has_pending());
1588
1589 let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1).unwrap());
1590 assert_eq!(ready.len(), 1);
1591 assert!(sequencer.has_pending());
1592
1593 let ready = sequencer.add_proof(1, SparseTrieUpdate::from_multiproof(proof2).unwrap());
1594 assert_eq!(ready.len(), 2);
1595 assert!(!sequencer.has_pending());
1596 }
1597
1598 #[test]
1599 fn test_add_proof_with_gaps() {
1600 let mut sequencer = ProofSequencer::default();
1601 let proof1 = MultiProof::default();
1602 let proof3 = MultiProof::default();
1603 sequencer.next_sequence = 3;
1604
1605 let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1).unwrap());
1606 assert_eq!(ready.len(), 1);
1607
1608 let ready = sequencer.add_proof(2, SparseTrieUpdate::from_multiproof(proof3).unwrap());
1609 assert_eq!(ready.len(), 0);
1610 assert!(sequencer.has_pending());
1611 }
1612
1613 #[test]
1614 fn test_add_proof_duplicate_sequence() {
1615 let mut sequencer = ProofSequencer::default();
1616 let proof1 = MultiProof::default();
1617 let proof2 = MultiProof::default();
1618
1619 let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1).unwrap());
1620 assert_eq!(ready.len(), 1);
1621
1622 let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof2).unwrap());
1623 assert_eq!(ready.len(), 0);
1624 assert!(!sequencer.has_pending());
1625 }
1626
1627 #[test]
1628 fn test_add_proof_batch_processing() {
1629 let mut sequencer = ProofSequencer::default();
1630 let proofs: Vec<_> = (0..5).map(|_| MultiProof::default()).collect();
1631 sequencer.next_sequence = 5;
1632
1633 sequencer.add_proof(4, SparseTrieUpdate::from_multiproof(proofs[4].clone()).unwrap());
1634 sequencer.add_proof(2, SparseTrieUpdate::from_multiproof(proofs[2].clone()).unwrap());
1635 sequencer.add_proof(1, SparseTrieUpdate::from_multiproof(proofs[1].clone()).unwrap());
1636 sequencer.add_proof(3, SparseTrieUpdate::from_multiproof(proofs[3].clone()).unwrap());
1637
1638 let ready =
1639 sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proofs[0].clone()).unwrap());
1640 assert_eq!(ready.len(), 5);
1641 assert!(!sequencer.has_pending());
1642 }
1643
1644 fn create_get_proof_targets_state() -> HashedPostState {
1645 let mut state = HashedPostState::default();
1646
1647 let addr1 = B256::random();
1648 let addr2 = B256::random();
1649 state.accounts.insert(addr1, Some(Default::default()));
1650 state.accounts.insert(addr2, Some(Default::default()));
1651
1652 let mut storage = HashedStorage::default();
1653 let slot1 = B256::random();
1654 let slot2 = B256::random();
1655 storage.storage.insert(slot1, U256::ZERO);
1656 storage.storage.insert(slot2, U256::from(1));
1657 state.storages.insert(addr1, storage);
1658
1659 state
1660 }
1661
1662 #[test]
1663 fn test_get_proof_targets_new_account_targets() {
1664 let state = create_get_proof_targets_state();
1665 let fetched = MultiProofTargets::default();
1666
1667 let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
1668
1669 assert_eq!(targets.len(), state.accounts.len());
1671 for addr in state.accounts.keys() {
1672 assert!(targets.contains_key(addr));
1673 }
1674 }
1675
1676 #[test]
1677 fn test_get_proof_targets_new_storage_targets() {
1678 let state = create_get_proof_targets_state();
1679 let fetched = MultiProofTargets::default();
1680
1681 let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
1682
1683 for (addr, storage) in &state.storages {
1685 assert!(targets.contains_key(addr));
1686 let target_slots = &targets[addr];
1687 assert_eq!(target_slots.len(), storage.storage.len());
1688 for slot in storage.storage.keys() {
1689 assert!(target_slots.contains(slot));
1690 }
1691 }
1692 }
1693
1694 #[test]
1695 fn test_get_proof_targets_filter_already_fetched_accounts() {
1696 let state = create_get_proof_targets_state();
1697 let mut fetched = MultiProofTargets::default();
1698
1699 let fetched_addr = state
1701 .accounts
1702 .keys()
1703 .find(|&&addr| !state.storages.contains_key(&addr))
1704 .expect("Should have an account without storage");
1705
1706 fetched.insert(*fetched_addr, HashSet::default());
1708
1709 let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
1710
1711 assert!(!targets.contains_key(fetched_addr));
1713 assert_eq!(targets.len(), state.accounts.len() - 1);
1715 }
1716
1717 #[test]
1718 fn test_get_proof_targets_filter_already_fetched_storage() {
1719 let state = create_get_proof_targets_state();
1720 let mut fetched = MultiProofTargets::default();
1721
1722 let (addr, storage) = state.storages.iter().next().unwrap();
1724 let mut fetched_slots = HashSet::default();
1725 let fetched_slot = *storage.storage.keys().next().unwrap();
1726 fetched_slots.insert(fetched_slot);
1727 fetched.insert(*addr, fetched_slots);
1728
1729 let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
1730
1731 let target_slots = &targets[addr];
1733 assert!(!target_slots.contains(&fetched_slot));
1734 assert_eq!(target_slots.len(), storage.storage.len() - 1);
1735 }
1736
1737 #[test]
1738 fn test_get_proof_targets_empty_state() {
1739 let state = HashedPostState::default();
1740 let fetched = MultiProofTargets::default();
1741
1742 let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
1743
1744 assert!(targets.is_empty());
1745 }
1746
1747 #[test]
1748 fn test_get_proof_targets_mixed_fetched_state() {
1749 let mut state = HashedPostState::default();
1750 let mut fetched = MultiProofTargets::default();
1751
1752 let addr1 = B256::random();
1753 let addr2 = B256::random();
1754 let slot1 = B256::random();
1755 let slot2 = B256::random();
1756
1757 state.accounts.insert(addr1, Some(Default::default()));
1758 state.accounts.insert(addr2, Some(Default::default()));
1759
1760 let mut storage = HashedStorage::default();
1761 storage.storage.insert(slot1, U256::ZERO);
1762 storage.storage.insert(slot2, U256::from(1));
1763 state.storages.insert(addr1, storage);
1764
1765 let mut fetched_slots = HashSet::default();
1766 fetched_slots.insert(slot1);
1767 fetched.insert(addr1, fetched_slots);
1768
1769 let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
1770
1771 assert!(targets.contains_key(&addr2));
1772 assert!(!targets[&addr1].contains(&slot1));
1773 assert!(targets[&addr1].contains(&slot2));
1774 }
1775
1776 #[test]
1777 fn test_get_proof_targets_unmodified_account_with_storage() {
1778 let mut state = HashedPostState::default();
1779 let fetched = MultiProofTargets::default();
1780
1781 let addr = B256::random();
1782 let slot1 = B256::random();
1783 let slot2 = B256::random();
1784
1785 let mut storage = HashedStorage::default();
1788 storage.storage.insert(slot1, U256::from(1));
1789 storage.storage.insert(slot2, U256::from(2));
1790 state.storages.insert(addr, storage);
1791
1792 assert!(!state.accounts.contains_key(&addr));
1793 assert!(!fetched.contains_key(&addr));
1794
1795 let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
1796
1797 assert!(targets.contains_key(&addr));
1799
1800 let target_slots = &targets[&addr];
1801 assert_eq!(target_slots.len(), 2);
1802 assert!(target_slots.contains(&slot1));
1803 assert!(target_slots.contains(&slot2));
1804 }
1805
1806 #[test]
1807 fn test_get_prefetch_proof_targets_no_duplicates() {
1808 let test_provider_factory = create_test_provider_factory();
1809 let mut test_state_root_task = create_test_state_root_task(test_provider_factory);
1810
1811 let mut targets = MultiProofTargets::default();
1813 let addr1 = B256::random();
1814 let addr2 = B256::random();
1815 let slot1 = B256::random();
1816 let slot2 = B256::random();
1817 targets.insert(addr1, std::iter::once(slot1).collect());
1818 targets.insert(addr2, std::iter::once(slot2).collect());
1819
1820 let prefetch_proof_targets =
1821 test_state_root_task.get_prefetch_proof_targets(targets.clone());
1822
1823 assert_eq!(prefetch_proof_targets, targets);
1826
1827 let addr3 = B256::random();
1829 let slot3 = B256::random();
1830 test_state_root_task.fetched_proof_targets.insert(addr3, std::iter::once(slot3).collect());
1831
1832 let prefetch_proof_targets =
1833 test_state_root_task.get_prefetch_proof_targets(targets.clone());
1834
1835 assert_eq!(prefetch_proof_targets, targets);
1838 }
1839
1840 #[test]
1841 fn test_get_prefetch_proof_targets_remove_subset() {
1842 let test_provider_factory = create_test_provider_factory();
1843 let mut test_state_root_task = create_test_state_root_task(test_provider_factory);
1844
1845 let mut targets = MultiProofTargets::default();
1847 let addr1 = B256::random();
1848 let addr2 = B256::random();
1849 let slot1 = B256::random();
1850 let slot2 = B256::random();
1851 targets.insert(addr1, std::iter::once(slot1).collect());
1852 targets.insert(addr2, std::iter::once(slot2).collect());
1853
1854 test_state_root_task.fetched_proof_targets.insert(addr1, std::iter::once(slot1).collect());
1856
1857 let prefetch_proof_targets =
1858 test_state_root_task.get_prefetch_proof_targets(targets.clone());
1859
1860 assert_eq!(prefetch_proof_targets.len(), 1);
1862 assert!(!prefetch_proof_targets.contains_key(&addr1));
1863 assert!(prefetch_proof_targets.contains_key(&addr2));
1864
1865 let slot3 = B256::random();
1867 targets.get_mut(&addr1).unwrap().insert(slot3);
1868
1869 let prefetch_proof_targets =
1870 test_state_root_task.get_prefetch_proof_targets(targets.clone());
1871
1872 assert_eq!(prefetch_proof_targets.len(), 2);
1875 assert!(prefetch_proof_targets.contains_key(&addr1));
1876 assert_eq!(
1877 *prefetch_proof_targets.get(&addr1).unwrap(),
1878 std::iter::once(slot3).collect::<B256Set>()
1879 );
1880 assert!(prefetch_proof_targets.contains_key(&addr2));
1881 assert_eq!(
1882 *prefetch_proof_targets.get(&addr2).unwrap(),
1883 std::iter::once(slot2).collect::<B256Set>()
1884 );
1885 }
1886
1887 #[test]
1888 fn test_get_proof_targets_with_removed_storage_keys() {
1889 let mut state = HashedPostState::default();
1890 let mut fetched = MultiProofTargets::default();
1891 let mut multi_added_removed_keys = MultiAddedRemovedKeys::new();
1892
1893 let addr = B256::random();
1894 let slot1 = B256::random();
1895 let slot2 = B256::random();
1896
1897 state.accounts.insert(addr, Some(Default::default()));
1899
1900 let mut storage = HashedStorage::default();
1902 storage.storage.insert(slot1, U256::from(100));
1903 storage.storage.insert(slot2, U256::from(200));
1904 state.storages.insert(addr, storage);
1905
1906 let mut fetched_slots = HashSet::default();
1908 fetched_slots.insert(slot1);
1909 fetched.insert(addr, fetched_slots);
1910
1911 let mut removed_state = HashedPostState::default();
1913 let mut removed_storage = HashedStorage::default();
1914 removed_storage.storage.insert(slot1, U256::ZERO); removed_state.storages.insert(addr, removed_storage);
1916 multi_added_removed_keys.update_with_state(&removed_state);
1917
1918 let targets = get_proof_targets(&state, &fetched, &multi_added_removed_keys);
1919
1920 assert!(targets.contains_key(&addr));
1922 let target_slots = &targets[&addr];
1923 assert_eq!(target_slots.len(), 2);
1924 assert!(target_slots.contains(&slot1)); assert!(target_slots.contains(&slot2)); }
1927
1928 #[test]
1929 fn test_get_proof_targets_with_wiped_storage() {
1930 let mut state = HashedPostState::default();
1931 let fetched = MultiProofTargets::default();
1932 let multi_added_removed_keys = MultiAddedRemovedKeys::new();
1933
1934 let addr = B256::random();
1935 let slot1 = B256::random();
1936
1937 state.accounts.insert(addr, Some(Default::default()));
1939
1940 let mut storage = HashedStorage::new(true);
1942 storage.storage.insert(slot1, U256::from(100));
1943 state.storages.insert(addr, storage);
1944
1945 let targets = get_proof_targets(&state, &fetched, &multi_added_removed_keys);
1946
1947 assert!(targets.contains_key(&addr));
1949 let target_slots = &targets[&addr];
1950 assert_eq!(target_slots.len(), 1);
1951 assert!(target_slots.contains(&slot1));
1952 }
1953
1954 #[test]
1955 fn test_get_proof_targets_removed_keys_not_in_state_update() {
1956 let mut state = HashedPostState::default();
1957 let mut fetched = MultiProofTargets::default();
1958 let mut multi_added_removed_keys = MultiAddedRemovedKeys::new();
1959
1960 let addr = B256::random();
1961 let slot1 = B256::random();
1962 let slot2 = B256::random();
1963 let slot3 = B256::random();
1964
1965 state.accounts.insert(addr, Some(Default::default()));
1967
1968 let mut storage = HashedStorage::default();
1970 storage.storage.insert(slot1, U256::from(100));
1971 storage.storage.insert(slot2, U256::from(200));
1972 state.storages.insert(addr, storage);
1973
1974 let mut fetched_slots = HashSet::default();
1976 fetched_slots.insert(slot1);
1977 fetched_slots.insert(slot2);
1978 fetched_slots.insert(slot3); fetched.insert(addr, fetched_slots);
1980
1981 let mut removed_state = HashedPostState::default();
1983 let mut removed_storage = HashedStorage::default();
1984 removed_storage.storage.insert(slot3, U256::ZERO);
1985 removed_state.storages.insert(addr, removed_storage);
1986 multi_added_removed_keys.update_with_state(&removed_state);
1987
1988 let targets = get_proof_targets(&state, &fetched, &multi_added_removed_keys);
1989
1990 assert!(!targets.contains_key(&addr));
1992 }
1993
1994 #[test]
1996 fn test_prefetch_proofs_batching() {
1997 let test_provider_factory = create_test_provider_factory();
1998 let mut task = create_test_state_root_task(test_provider_factory);
1999
2000 let addr1 = B256::random();
2002 let addr2 = B256::random();
2003 let addr3 = B256::random();
2004
2005 let mut targets1 = MultiProofTargets::default();
2006 targets1.insert(addr1, HashSet::default());
2007
2008 let mut targets2 = MultiProofTargets::default();
2009 targets2.insert(addr2, HashSet::default());
2010
2011 let mut targets3 = MultiProofTargets::default();
2012 targets3.insert(addr3, HashSet::default());
2013
2014 let tx = task.state_root_message_sender();
2015 tx.send(MultiProofMessage::PrefetchProofs(targets1)).unwrap();
2016 tx.send(MultiProofMessage::PrefetchProofs(targets2)).unwrap();
2017 tx.send(MultiProofMessage::PrefetchProofs(targets3)).unwrap();
2018
2019 let proofs_requested =
2020 if let Ok(MultiProofMessage::PrefetchProofs(targets)) = task.rx.recv() {
2021 let mut merged_targets = targets;
2023 let mut num_batched = 1;
2024 while let Ok(MultiProofMessage::PrefetchProofs(next_targets)) = task.rx.try_recv() {
2025 merged_targets.extend(next_targets);
2026 num_batched += 1;
2027 }
2028
2029 assert_eq!(num_batched, 3);
2030 assert_eq!(merged_targets.len(), 3);
2031 assert!(merged_targets.contains_key(&addr1));
2032 assert!(merged_targets.contains_key(&addr2));
2033 assert!(merged_targets.contains_key(&addr3));
2034
2035 task.on_prefetch_proof(merged_targets)
2036 } else {
2037 panic!("Expected PrefetchProofs message");
2038 };
2039
2040 assert_eq!(proofs_requested, 1);
2041 }
2042
2043 #[test]
2045 fn test_state_update_batching() {
2046 use alloy_evm::block::StateChangeSource;
2047 use revm_state::Account;
2048
2049 let test_provider_factory = create_test_provider_factory();
2050 let mut task = create_test_state_root_task(test_provider_factory);
2051
2052 let addr1 = alloy_primitives::Address::random();
2054 let addr2 = alloy_primitives::Address::random();
2055
2056 let mut update1 = EvmState::default();
2057 update1.insert(
2058 addr1,
2059 Account {
2060 info: revm_state::AccountInfo {
2061 balance: U256::from(100),
2062 nonce: 1,
2063 code_hash: Default::default(),
2064 code: Default::default(),
2065 },
2066 transaction_id: Default::default(),
2067 storage: Default::default(),
2068 status: revm_state::AccountStatus::Touched,
2069 },
2070 );
2071
2072 let mut update2 = EvmState::default();
2073 update2.insert(
2074 addr2,
2075 Account {
2076 info: revm_state::AccountInfo {
2077 balance: U256::from(200),
2078 nonce: 2,
2079 code_hash: Default::default(),
2080 code: Default::default(),
2081 },
2082 transaction_id: Default::default(),
2083 storage: Default::default(),
2084 status: revm_state::AccountStatus::Touched,
2085 },
2086 );
2087
2088 let source = StateChangeSource::Transaction(0);
2089
2090 let tx = task.state_root_message_sender();
2091 tx.send(MultiProofMessage::StateUpdate(source.into(), update1.clone())).unwrap();
2092 tx.send(MultiProofMessage::StateUpdate(source.into(), update2.clone())).unwrap();
2093
2094 let proofs_requested =
2095 if let Ok(MultiProofMessage::StateUpdate(_src, update)) = task.rx.recv() {
2096 let mut merged_update = update;
2097 let mut num_batched = 1;
2098
2099 while let Ok(MultiProofMessage::StateUpdate(_next_source, next_update)) =
2100 task.rx.try_recv()
2101 {
2102 merged_update.extend(next_update);
2103 num_batched += 1;
2104 }
2105
2106 assert_eq!(num_batched, 2);
2107 assert_eq!(merged_update.len(), 2);
2108 assert!(merged_update.contains_key(&addr1));
2109 assert!(merged_update.contains_key(&addr2));
2110
2111 task.on_state_update(source.into(), merged_update)
2112 } else {
2113 panic!("Expected StateUpdate message");
2114 };
2115 assert_eq!(proofs_requested, 1);
2116 }
2117
2118 #[test]
2120 fn test_state_update_batching_separates_sources() {
2121 use alloy_evm::block::StateChangeSource;
2122 use revm_state::Account;
2123
2124 let test_provider_factory = create_test_provider_factory();
2125 let task = create_test_state_root_task(test_provider_factory);
2126
2127 let addr_a1 = alloy_primitives::Address::random();
2128 let addr_b1 = alloy_primitives::Address::random();
2129 let addr_a2 = alloy_primitives::Address::random();
2130
2131 let create_state_update = |addr: alloy_primitives::Address, balance: u64| {
2132 let mut state = EvmState::default();
2133 state.insert(
2134 addr,
2135 Account {
2136 info: revm_state::AccountInfo {
2137 balance: U256::from(balance),
2138 nonce: 1,
2139 code_hash: Default::default(),
2140 code: Default::default(),
2141 },
2142 transaction_id: Default::default(),
2143 storage: Default::default(),
2144 status: revm_state::AccountStatus::Touched,
2145 },
2146 );
2147 state
2148 };
2149
2150 let source_a = StateChangeSource::Transaction(1);
2151 let source_b = StateChangeSource::Transaction(2);
2152
2153 let tx = task.state_root_message_sender();
2155 tx.send(MultiProofMessage::StateUpdate(source_a.into(), create_state_update(addr_a1, 100)))
2156 .unwrap();
2157 tx.send(MultiProofMessage::StateUpdate(source_b.into(), create_state_update(addr_b1, 200)))
2158 .unwrap();
2159 tx.send(MultiProofMessage::StateUpdate(source_a.into(), create_state_update(addr_a2, 300)))
2160 .unwrap();
2161
2162 let mut pending_msg: Option<MultiProofMessage> = None;
2163
2164 if let Ok(MultiProofMessage::StateUpdate(first_source, _)) = task.rx.recv() {
2165 assert!(same_source(first_source, source_a.into()));
2166
2167 let mut accumulated_updates: Vec<(Source, EvmState)> = Vec::new();
2169 let mut accumulated_targets = 0usize;
2170
2171 loop {
2172 if accumulated_targets >= STATE_UPDATE_MAX_BATCH_TARGETS {
2173 break;
2174 }
2175 match task.rx.try_recv() {
2176 Ok(MultiProofMessage::StateUpdate(next_source, next_update)) => {
2177 if let Some((batch_source, batch_update)) = accumulated_updates.first() &&
2178 !can_batch_state_update(
2179 *batch_source,
2180 batch_update,
2181 next_source,
2182 &next_update,
2183 )
2184 {
2185 pending_msg =
2186 Some(MultiProofMessage::StateUpdate(next_source, next_update));
2187 break;
2188 }
2189
2190 let next_estimate = estimate_evm_state_targets(&next_update);
2191 if next_estimate > STATE_UPDATE_MAX_BATCH_TARGETS {
2192 pending_msg =
2193 Some(MultiProofMessage::StateUpdate(next_source, next_update));
2194 break;
2195 }
2196 if accumulated_targets + next_estimate > STATE_UPDATE_MAX_BATCH_TARGETS &&
2197 !accumulated_updates.is_empty()
2198 {
2199 pending_msg =
2200 Some(MultiProofMessage::StateUpdate(next_source, next_update));
2201 break;
2202 }
2203 accumulated_targets += next_estimate;
2204 accumulated_updates.push((next_source, next_update));
2205 }
2206 Ok(other_msg) => {
2207 pending_msg = Some(other_msg);
2208 break;
2209 }
2210 Err(_) => break,
2211 }
2212 }
2213
2214 assert_eq!(accumulated_updates.len(), 1, "Should only batch matching sources");
2215 let batch_source = accumulated_updates[0].0;
2216 assert!(same_source(batch_source, source_b.into()));
2217
2218 let batch_source = accumulated_updates[0].0;
2219 let mut merged_update = accumulated_updates.remove(0).1;
2220 for (_, next_update) in accumulated_updates {
2221 merged_update.extend(next_update);
2222 }
2223
2224 assert!(same_source(batch_source, source_b.into()), "Batch should use matching source");
2225 assert!(merged_update.contains_key(&addr_b1));
2226 assert!(!merged_update.contains_key(&addr_a1));
2227 assert!(!merged_update.contains_key(&addr_a2));
2228 } else {
2229 panic!("Expected first StateUpdate");
2230 }
2231
2232 match pending_msg {
2233 Some(MultiProofMessage::StateUpdate(pending_source, pending_update)) => {
2234 assert!(same_source(pending_source, source_a.into()));
2235 assert!(pending_update.contains_key(&addr_a2));
2236 }
2237 other => panic!("Expected pending StateUpdate with source_a, got {:?}", other),
2238 }
2239 }
2240
2241 #[test]
2243 fn test_pre_block_updates_require_payload_match_to_batch() {
2244 use alloy_evm::block::{StateChangePreBlockSource, StateChangeSource};
2245 use revm_state::Account;
2246
2247 let test_provider_factory = create_test_provider_factory();
2248 let task = create_test_state_root_task(test_provider_factory);
2249
2250 let addr1 = alloy_primitives::Address::random();
2251 let addr2 = alloy_primitives::Address::random();
2252 let addr3 = alloy_primitives::Address::random();
2253
2254 let create_state_update = |addr: alloy_primitives::Address, balance: u64| {
2255 let mut state = EvmState::default();
2256 state.insert(
2257 addr,
2258 Account {
2259 info: revm_state::AccountInfo {
2260 balance: U256::from(balance),
2261 nonce: 1,
2262 code_hash: Default::default(),
2263 code: Default::default(),
2264 },
2265 transaction_id: Default::default(),
2266 storage: Default::default(),
2267 status: revm_state::AccountStatus::Touched,
2268 },
2269 );
2270 state
2271 };
2272
2273 let source = StateChangeSource::PreBlock(StateChangePreBlockSource::BeaconRootContract);
2274
2275 let tx = task.state_root_message_sender();
2277 tx.send(MultiProofMessage::StateUpdate(source.into(), create_state_update(addr1, 100)))
2278 .unwrap();
2279 tx.send(MultiProofMessage::StateUpdate(source.into(), create_state_update(addr2, 200)))
2280 .unwrap();
2281 tx.send(MultiProofMessage::StateUpdate(source.into(), create_state_update(addr3, 300)))
2282 .unwrap();
2283
2284 let mut pending_msg: Option<MultiProofMessage> = None;
2285
2286 if let Ok(MultiProofMessage::StateUpdate(first_source, first_update)) = task.rx.recv() {
2287 assert!(same_source(first_source, source.into()));
2288 assert!(first_update.contains_key(&addr1));
2289
2290 let mut accumulated_updates: Vec<(Source, EvmState)> = Vec::new();
2291 let mut accumulated_targets = 0usize;
2292
2293 loop {
2294 if accumulated_targets >= STATE_UPDATE_MAX_BATCH_TARGETS {
2295 break;
2296 }
2297 match task.rx.try_recv() {
2298 Ok(MultiProofMessage::StateUpdate(next_source, next_update)) => {
2299 if let Some((batch_source, batch_update)) = accumulated_updates.first() &&
2300 !can_batch_state_update(
2301 *batch_source,
2302 batch_update,
2303 next_source,
2304 &next_update,
2305 )
2306 {
2307 pending_msg =
2308 Some(MultiProofMessage::StateUpdate(next_source, next_update));
2309 break;
2310 }
2311
2312 let next_estimate = estimate_evm_state_targets(&next_update);
2313 if next_estimate > STATE_UPDATE_MAX_BATCH_TARGETS {
2314 pending_msg =
2315 Some(MultiProofMessage::StateUpdate(next_source, next_update));
2316 break;
2317 }
2318 if accumulated_targets + next_estimate > STATE_UPDATE_MAX_BATCH_TARGETS &&
2319 !accumulated_updates.is_empty()
2320 {
2321 pending_msg =
2322 Some(MultiProofMessage::StateUpdate(next_source, next_update));
2323 break;
2324 }
2325 accumulated_targets += next_estimate;
2326 accumulated_updates.push((next_source, next_update));
2327 }
2328 Ok(other_msg) => {
2329 pending_msg = Some(other_msg);
2330 break;
2331 }
2332 Err(_) => break,
2333 }
2334 }
2335
2336 assert_eq!(
2337 accumulated_updates.len(),
2338 1,
2339 "Second pre-block update should not merge with a different payload"
2340 );
2341 let (batched_source, batched_update) = accumulated_updates.remove(0);
2342 assert!(same_source(batched_source, source.into()));
2343 assert!(batched_update.contains_key(&addr2));
2344 assert!(!batched_update.contains_key(&addr3));
2345
2346 match pending_msg {
2347 Some(MultiProofMessage::StateUpdate(_, pending_update)) => {
2348 assert!(pending_update.contains_key(&addr3));
2349 }
2350 other => panic!("Expected pending third pre-block update, got {:?}", other),
2351 }
2352 } else {
2353 panic!("Expected first StateUpdate");
2354 }
2355 }
2356
2357 #[test]
2359 fn test_batching_preserves_ordering_with_different_message_type() {
2360 use alloy_evm::block::StateChangeSource;
2361 use revm_state::Account;
2362
2363 let test_provider_factory = create_test_provider_factory();
2364 let task = create_test_state_root_task(test_provider_factory);
2365
2366 let addr1 = B256::random();
2367 let addr2 = B256::random();
2368 let addr3 = B256::random();
2369 let state_addr1 = alloy_primitives::Address::random();
2370 let state_addr2 = alloy_primitives::Address::random();
2371
2372 let mut targets1 = MultiProofTargets::default();
2374 targets1.insert(addr1, HashSet::default());
2375
2376 let mut targets2 = MultiProofTargets::default();
2377 targets2.insert(addr2, HashSet::default());
2378
2379 let mut targets3 = MultiProofTargets::default();
2380 targets3.insert(addr3, HashSet::default());
2381
2382 let mut state_update1 = EvmState::default();
2384 state_update1.insert(
2385 state_addr1,
2386 Account {
2387 info: revm_state::AccountInfo {
2388 balance: U256::from(100),
2389 nonce: 1,
2390 code_hash: Default::default(),
2391 code: Default::default(),
2392 },
2393 transaction_id: Default::default(),
2394 storage: Default::default(),
2395 status: revm_state::AccountStatus::Touched,
2396 },
2397 );
2398
2399 let mut state_update2 = EvmState::default();
2401 state_update2.insert(
2402 state_addr2,
2403 Account {
2404 info: revm_state::AccountInfo {
2405 balance: U256::from(200),
2406 nonce: 2,
2407 code_hash: Default::default(),
2408 code: Default::default(),
2409 },
2410 transaction_id: Default::default(),
2411 storage: Default::default(),
2412 status: revm_state::AccountStatus::Touched,
2413 },
2414 );
2415
2416 let source = StateChangeSource::Transaction(42);
2417
2418 let tx = task.state_root_message_sender();
2420 tx.send(MultiProofMessage::PrefetchProofs(targets1)).unwrap();
2421 tx.send(MultiProofMessage::PrefetchProofs(targets2)).unwrap();
2422 tx.send(MultiProofMessage::StateUpdate(source.into(), state_update1)).unwrap();
2423 tx.send(MultiProofMessage::StateUpdate(source.into(), state_update2)).unwrap();
2424 tx.send(MultiProofMessage::PrefetchProofs(targets3.clone())).unwrap();
2425
2426 let mut pending_msg: Option<MultiProofMessage> = None;
2428 if let Ok(MultiProofMessage::PrefetchProofs(targets)) = task.rx.recv() {
2429 let mut merged_targets = targets;
2430 let mut num_batched = 1;
2431
2432 loop {
2433 match task.rx.try_recv() {
2434 Ok(MultiProofMessage::PrefetchProofs(next_targets)) => {
2435 merged_targets.extend(next_targets);
2436 num_batched += 1;
2437 }
2438 Ok(other_msg) => {
2439 pending_msg = Some(other_msg);
2441 break;
2442 }
2443 Err(_) => break,
2444 }
2445 }
2446
2447 assert_eq!(num_batched, 2, "Should batch only until different message type");
2449 assert_eq!(merged_targets.len(), 2);
2450 assert!(merged_targets.contains_key(&addr1));
2451 assert!(merged_targets.contains_key(&addr2));
2452 assert!(!merged_targets.contains_key(&addr3), "addr3 should NOT be in first batch");
2453 } else {
2454 panic!("Expected PrefetchProofs message");
2455 }
2456
2457 match pending_msg {
2459 Some(MultiProofMessage::StateUpdate(_src, update)) => {
2460 assert!(update.contains_key(&state_addr1), "Should be first StateUpdate");
2461 }
2462 _ => panic!("StateUpdate1 was lost or reordered! The ordering fix is broken."),
2463 }
2464
2465 match task.rx.try_recv() {
2467 Ok(MultiProofMessage::StateUpdate(_src, update)) => {
2468 assert!(update.contains_key(&state_addr2), "Should be second StateUpdate");
2469 }
2470 _ => panic!("StateUpdate2 was lost!"),
2471 }
2472
2473 match task.rx.try_recv() {
2475 Ok(MultiProofMessage::PrefetchProofs(targets)) => {
2476 assert_eq!(targets.len(), 1);
2477 assert!(targets.contains_key(&addr3));
2478 }
2479 _ => panic!("PrefetchProofs3 was lost!"),
2480 }
2481 }
2482
2483 #[test]
2485 fn test_pending_message_processed_before_next_iteration() {
2486 use alloy_evm::block::StateChangeSource;
2487 use revm_state::Account;
2488
2489 let test_provider_factory = create_test_provider_factory();
2490 let test_provider = create_cached_provider(test_provider_factory.clone());
2491 let mut task = create_test_state_root_task(test_provider_factory);
2492
2493 let prefetch_addr1 = B256::random();
2495 let prefetch_addr2 = B256::random();
2496 let mut prefetch1 = MultiProofTargets::default();
2497 prefetch1.insert(prefetch_addr1, HashSet::default());
2498 let mut prefetch2 = MultiProofTargets::default();
2499 prefetch2.insert(prefetch_addr2, HashSet::default());
2500
2501 let state_addr = alloy_primitives::Address::random();
2502 let mut state_update = EvmState::default();
2503 state_update.insert(
2504 state_addr,
2505 Account {
2506 info: revm_state::AccountInfo {
2507 balance: U256::from(42),
2508 nonce: 1,
2509 code_hash: Default::default(),
2510 code: Default::default(),
2511 },
2512 transaction_id: Default::default(),
2513 storage: Default::default(),
2514 status: revm_state::AccountStatus::Touched,
2515 },
2516 );
2517
2518 let source = StateChangeSource::Transaction(99);
2519
2520 let tx = task.state_root_message_sender();
2521 tx.send(MultiProofMessage::PrefetchProofs(prefetch1)).unwrap();
2522 tx.send(MultiProofMessage::StateUpdate(source.into(), state_update)).unwrap();
2523 tx.send(MultiProofMessage::PrefetchProofs(prefetch2.clone())).unwrap();
2524
2525 let mut ctx = MultiproofBatchCtx::new(Instant::now());
2526 let mut batch_metrics = MultiproofBatchMetrics::default();
2527
2528 let first = task.rx.recv().unwrap();
2530 assert!(matches!(first, MultiProofMessage::PrefetchProofs(_)));
2531 assert!(!task.process_multiproof_message(
2532 first,
2533 &mut ctx,
2534 &mut batch_metrics,
2535 &test_provider
2536 ));
2537 let pending = ctx.pending_msg.take().expect("pending message captured");
2538 assert!(matches!(pending, MultiProofMessage::StateUpdate(_, _)));
2539
2540 assert!(!task.process_multiproof_message(
2542 pending,
2543 &mut ctx,
2544 &mut batch_metrics,
2545 &test_provider
2546 ));
2547
2548 match ctx.pending_msg.take() {
2550 Some(MultiProofMessage::PrefetchProofs(targets)) => {
2551 assert_eq!(targets.len(), 1);
2552 assert!(targets.contains_key(&prefetch_addr2));
2553 }
2554 other => panic!("Expected remaining PrefetchProofs2 in pending_msg, got {:?}", other),
2555 }
2556 }
2557
2558 #[test]
2560 fn test_pending_messages_get_full_batching_treatment() {
2561 use alloy_evm::block::StateChangeSource;
2575 use revm_state::Account;
2576
2577 let test_provider_factory = create_test_provider_factory();
2578 let task = create_test_state_root_task(test_provider_factory);
2579
2580 let prefetch_addr1 = B256::random();
2581 let prefetch_addr2 = B256::random();
2582 let state_addr1 = alloy_primitives::Address::random();
2583 let state_addr2 = alloy_primitives::Address::random();
2584 let state_addr3 = alloy_primitives::Address::random();
2585
2586 let mut prefetch1 = MultiProofTargets::default();
2588 prefetch1.insert(prefetch_addr1, HashSet::default());
2589
2590 let mut prefetch2 = MultiProofTargets::default();
2591 prefetch2.insert(prefetch_addr2, HashSet::default());
2592
2593 let create_state_update = |addr: alloy_primitives::Address, balance: u64| {
2595 let mut state = EvmState::default();
2596 state.insert(
2597 addr,
2598 Account {
2599 info: revm_state::AccountInfo {
2600 balance: U256::from(balance),
2601 nonce: 1,
2602 code_hash: Default::default(),
2603 code: Default::default(),
2604 },
2605 transaction_id: Default::default(),
2606 storage: Default::default(),
2607 status: revm_state::AccountStatus::Touched,
2608 },
2609 );
2610 state
2611 };
2612
2613 let source = StateChangeSource::Transaction(42);
2614
2615 let tx = task.state_root_message_sender();
2617 tx.send(MultiProofMessage::PrefetchProofs(prefetch1.clone())).unwrap();
2618 tx.send(MultiProofMessage::StateUpdate(
2619 source.into(),
2620 create_state_update(state_addr1, 100),
2621 ))
2622 .unwrap();
2623 tx.send(MultiProofMessage::StateUpdate(
2624 source.into(),
2625 create_state_update(state_addr2, 200),
2626 ))
2627 .unwrap();
2628 tx.send(MultiProofMessage::StateUpdate(
2629 source.into(),
2630 create_state_update(state_addr3, 300),
2631 ))
2632 .unwrap();
2633 tx.send(MultiProofMessage::PrefetchProofs(prefetch2.clone())).unwrap();
2634
2635 let mut pending_msg: Option<MultiProofMessage> = None;
2637
2638 if let Ok(MultiProofMessage::PrefetchProofs(targets)) = task.rx.recv() {
2640 let mut merged_targets = targets;
2641 loop {
2642 match task.rx.try_recv() {
2643 Ok(MultiProofMessage::PrefetchProofs(next_targets)) => {
2644 merged_targets.extend(next_targets);
2645 }
2646 Ok(other_msg) => {
2647 pending_msg = Some(other_msg);
2648 break;
2649 }
2650 Err(_) => break,
2651 }
2652 }
2653 assert_eq!(merged_targets.len(), 1);
2655 assert!(merged_targets.contains_key(&prefetch_addr1));
2656 } else {
2657 panic!("Expected PrefetchProofs");
2658 }
2659
2660 assert!(matches!(pending_msg, Some(MultiProofMessage::StateUpdate(_, _))));
2662
2663 if let Some(MultiProofMessage::StateUpdate(_src, first_update)) = pending_msg.take() {
2666 let mut merged_update = first_update;
2667 let mut num_batched = 1;
2668
2669 loop {
2670 match task.rx.try_recv() {
2671 Ok(MultiProofMessage::StateUpdate(_src, next_update)) => {
2672 merged_update.extend(next_update);
2673 num_batched += 1;
2674 }
2675 Ok(other_msg) => {
2676 pending_msg = Some(other_msg);
2677 break;
2678 }
2679 Err(_) => break,
2680 }
2681 }
2682
2683 assert_eq!(
2685 num_batched, 3,
2686 "Pending message should get full batching treatment and merge all 3 StateUpdates"
2687 );
2688 assert_eq!(merged_update.len(), 3, "Should have all 3 addresses in merged update");
2689 assert!(merged_update.contains_key(&state_addr1));
2690 assert!(merged_update.contains_key(&state_addr2));
2691 assert!(merged_update.contains_key(&state_addr3));
2692 } else {
2693 panic!("Expected pending StateUpdate");
2694 }
2695
2696 match pending_msg {
2698 Some(MultiProofMessage::PrefetchProofs(targets)) => {
2699 assert_eq!(targets.len(), 1);
2700 assert!(targets.contains_key(&prefetch_addr2));
2701 }
2702 _ => panic!("Prefetch2 was lost!"),
2703 }
2704 }
2705
2706 #[test]
2708 fn test_bal_message_processing() {
2709 let test_provider_factory = create_test_provider_factory();
2710 let test_provider = create_cached_provider(test_provider_factory.clone());
2711 let mut task = create_test_state_root_task(test_provider_factory);
2712
2713 let account_address = Address::random();
2715 let account_changes = AccountChanges {
2716 address: account_address,
2717 balance_changes: vec![BalanceChange::new(0, U256::from(1000))],
2718 nonce_changes: vec![],
2719 code_changes: vec![],
2720 storage_changes: vec![],
2721 storage_reads: vec![],
2722 };
2723
2724 let bal = Arc::new(vec![account_changes]);
2725
2726 let mut ctx = MultiproofBatchCtx::new(Instant::now());
2727 let mut batch_metrics = MultiproofBatchMetrics::default();
2728
2729 let should_finish = task.process_multiproof_message(
2730 MultiProofMessage::BlockAccessList(bal),
2731 &mut ctx,
2732 &mut batch_metrics,
2733 &test_provider,
2734 );
2735
2736 assert!(ctx.updates_finished_time.is_some());
2738
2739 assert!(batch_metrics.state_update_proofs_requested > 0);
2741
2742 assert!(!should_finish, "Should continue waiting for proofs");
2744 }
2745}