1use crate::tree::payload_processor::bal::bal_to_hashed_post_state;
4use alloy_eip7928::BlockAccessList;
5use alloy_evm::block::StateChangeSource;
6use alloy_primitives::{keccak256, map::HashSet, B256};
7use crossbeam_channel::{unbounded, Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
8use derive_more::derive::Deref;
9use metrics::{Gauge, Histogram};
10use reth_metrics::Metrics;
11use reth_provider::AccountReader;
12use reth_revm::state::EvmState;
13use reth_trie::{
14 added_removed_keys::MultiAddedRemovedKeys, proof_v2, HashedPostState, HashedStorage,
15 MultiProofTargets,
16};
17#[cfg(test)]
18use reth_trie_parallel::stats::ParallelTrieTracker;
19use reth_trie_parallel::{
20 proof::ParallelProof,
21 proof_task::{
22 AccountMultiproofInput, ProofResult, ProofResultContext, ProofResultMessage,
23 ProofWorkerHandle,
24 },
25 targets_v2::MultiProofTargetsV2,
26};
27use revm_primitives::map::{hash_map, B256Map};
28use std::{collections::BTreeMap, sync::Arc, time::Instant};
29use tracing::{debug, error, instrument, trace};
30
31#[derive(Clone, Copy)]
33pub enum Source {
34 Evm(StateChangeSource),
36 BlockAccessList,
38}
39
40impl std::fmt::Debug for Source {
41 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42 match self {
43 Self::Evm(source) => source.fmt(f),
44 Self::BlockAccessList => f.write_str("BlockAccessList"),
45 }
46 }
47}
48
49impl From<StateChangeSource> for Source {
50 fn from(source: StateChangeSource) -> Self {
51 Self::Evm(source)
52 }
53}
54
55const PREFETCH_MAX_BATCH_TARGETS: usize = 512;
59
60const PREFETCH_MAX_BATCH_MESSAGES: usize = 16;
63
64pub(crate) const DEFAULT_MAX_TARGETS_FOR_CHUNKING: usize = 300;
67
68#[derive(Debug)]
71pub struct SparseTrieUpdate {
72 pub(crate) state: HashedPostState,
74 pub(crate) multiproof: ProofResult,
76}
77
78impl SparseTrieUpdate {
79 pub(super) fn is_empty(&self) -> bool {
81 self.state.is_empty() && self.multiproof.is_empty()
82 }
83
84 #[cfg(test)]
86 pub(super) fn from_multiproof(multiproof: reth_trie::MultiProof) -> alloy_rlp::Result<Self> {
87 let stats = ParallelTrieTracker::default().finish();
88 Ok(Self {
89 state: HashedPostState::default(),
90 multiproof: ProofResult::Legacy(multiproof.try_into()?, stats),
91 })
92 }
93
94 pub(super) fn extend(&mut self, other: Self) {
96 self.state.extend(other.state);
97 self.multiproof.extend(other.multiproof);
98 }
99}
100
101#[derive(Debug)]
103pub enum MultiProofMessage {
104 PrefetchProofs(VersionedMultiProofTargets),
106 StateUpdate(Source, EvmState),
108 EmptyProof {
113 sequence_number: u64,
115 state: HashedPostState,
117 },
118 HashedStateUpdate(HashedPostState),
120 BlockAccessList(Arc<BlockAccessList>),
125 FinishedStateUpdates,
130}
131
132#[derive(Debug, Default)]
134struct ProofSequencer {
135 next_sequence: u64,
137 next_to_deliver: u64,
139 pending_proofs: BTreeMap<u64, SparseTrieUpdate>,
141}
142
143impl ProofSequencer {
144 const fn next_sequence(&mut self) -> u64 {
146 let seq = self.next_sequence;
147 self.next_sequence += 1;
148 seq
149 }
150
151 fn add_proof(&mut self, sequence: u64, update: SparseTrieUpdate) -> Vec<SparseTrieUpdate> {
154 if sequence == self.next_to_deliver {
157 let mut consecutive_proofs = Vec::with_capacity(1);
158 consecutive_proofs.push(update);
159 self.next_to_deliver += 1;
160
161 while let Some(pending) = self.pending_proofs.remove(&self.next_to_deliver) {
163 consecutive_proofs.push(pending);
164 self.next_to_deliver += 1;
165 }
166
167 return consecutive_proofs;
168 }
169
170 if sequence > self.next_to_deliver {
171 self.pending_proofs.insert(sequence, update);
172 }
173
174 Vec::new()
175 }
176
177 pub(crate) fn has_pending(&self) -> bool {
179 !self.pending_proofs.is_empty()
180 }
181}
182
183#[derive(Deref, Debug)]
189pub(super) struct StateHookSender(CrossbeamSender<MultiProofMessage>);
190
191impl StateHookSender {
192 pub(crate) const fn new(inner: CrossbeamSender<MultiProofMessage>) -> Self {
193 Self(inner)
194 }
195}
196
197impl Drop for StateHookSender {
198 fn drop(&mut self) {
199 let _ = self.0.send(MultiProofMessage::FinishedStateUpdates);
201 }
202}
203
204pub(crate) fn evm_state_to_hashed_post_state(update: EvmState) -> HashedPostState {
205 let mut hashed_state = HashedPostState::with_capacity(update.len());
206
207 for (address, account) in update {
208 if account.is_touched() {
209 let hashed_address = keccak256(address);
210 trace!(target: "engine::tree::payload_processor::multiproof", ?address, ?hashed_address, "Adding account to state update");
211
212 let destroyed = account.is_selfdestructed();
213 let info = if destroyed { None } else { Some(account.info.into()) };
214 hashed_state.accounts.insert(hashed_address, info);
215
216 let mut changed_storage_iter = account
217 .storage
218 .into_iter()
219 .filter(|(_slot, value)| value.is_changed())
220 .map(|(slot, value)| (keccak256(B256::from(slot)), value.present_value))
221 .peekable();
222
223 if destroyed {
224 hashed_state.storages.insert(hashed_address, HashedStorage::new(true));
225 } else if changed_storage_iter.peek().is_some() {
226 hashed_state
227 .storages
228 .insert(hashed_address, HashedStorage::from_iter(false, changed_storage_iter));
229 }
230 }
231 }
232
233 hashed_state
234}
235
236fn extend_multiproof_targets(dest: &mut MultiProofTargets, src: &VersionedMultiProofTargets) {
239 match src {
240 VersionedMultiProofTargets::Legacy(targets) => {
241 dest.extend_ref(targets);
242 }
243 VersionedMultiProofTargets::V2(targets) => {
244 for target in &targets.account_targets {
246 dest.entry(target.key()).or_default();
247 }
248
249 for (hashed_address, slots) in &targets.storage_targets {
251 let slot_set = dest.entry(*hashed_address).or_default();
252 for slot in slots {
253 slot_set.insert(slot.key());
254 }
255 }
256 }
257 }
258}
259
260#[derive(Debug)]
262pub enum VersionedMultiProofTargets {
263 Legacy(MultiProofTargets),
265 V2(MultiProofTargetsV2),
267}
268
269impl VersionedMultiProofTargets {
270 fn is_empty(&self) -> bool {
272 match self {
273 Self::Legacy(targets) => targets.is_empty(),
274 Self::V2(targets) => targets.is_empty(),
275 }
276 }
277
278 fn account_targets_len(&self) -> usize {
280 match self {
281 Self::Legacy(targets) => targets.len(),
282 Self::V2(targets) => targets.account_targets.len(),
283 }
284 }
285
286 fn storage_targets_len(&self) -> usize {
288 match self {
289 Self::Legacy(targets) => targets.values().map(|slots| slots.len()).sum::<usize>(),
290 Self::V2(targets) => {
291 targets.storage_targets.values().map(|slots| slots.len()).sum::<usize>()
292 }
293 }
294 }
295
296 fn len(&self) -> usize {
298 match self {
299 Self::Legacy(targets) => targets.len(),
300 Self::V2(targets) => targets.account_targets.len(),
301 }
302 }
303
304 fn storage_count(&self) -> usize {
306 match self {
307 Self::Legacy(targets) => targets.values().map(|slots| slots.len()).sum(),
308 Self::V2(targets) => targets.storage_targets.values().map(|slots| slots.len()).sum(),
309 }
310 }
311
312 fn chunking_length(&self) -> usize {
314 match self {
315 Self::Legacy(targets) => targets.chunking_length(),
316 Self::V2(targets) => targets.chunking_length(),
317 }
318 }
319
320 fn retain_difference(&mut self, other: &MultiProofTargets) {
323 match self {
324 Self::Legacy(targets) => {
325 targets.retain_difference(other);
326 }
327 Self::V2(targets) => {
328 targets.account_targets.retain(|target| !other.contains_key(&target.key()));
330
331 targets.storage_targets.retain(|hashed_address, slots| {
333 if let Some(other_slots) = other.get(hashed_address) {
334 slots.retain(|slot| !other_slots.contains(&slot.key()));
335 !slots.is_empty()
336 } else {
337 true
338 }
339 });
340 }
341 }
342 }
343
344 fn extend(&mut self, other: Self) {
348 match (self, other) {
349 (Self::Legacy(dest), Self::Legacy(src)) => {
350 dest.extend(src);
351 }
352 (Self::V2(dest), Self::V2(src)) => {
353 dest.account_targets.extend(src.account_targets);
354 for (addr, slots) in src.storage_targets {
355 dest.storage_targets.entry(addr).or_default().extend(slots);
356 }
357 }
358 _ => panic!("Cannot extend VersionedMultiProofTargets with mismatched variants"),
359 }
360 }
361
362 fn chunks(self, chunk_size: usize) -> Box<dyn Iterator<Item = Self>> {
364 match self {
365 Self::Legacy(targets) => {
366 Box::new(MultiProofTargets::chunks(targets, chunk_size).map(Self::Legacy))
367 }
368 Self::V2(targets) => Box::new(targets.chunks(chunk_size).map(Self::V2)),
369 }
370 }
371}
372
373#[derive(Debug)]
375struct MultiproofInput {
376 source: Option<Source>,
377 hashed_state_update: HashedPostState,
378 proof_targets: VersionedMultiProofTargets,
379 proof_sequence_number: u64,
380 state_root_message_sender: CrossbeamSender<MultiProofMessage>,
381 multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
382}
383
384impl MultiproofInput {
385 fn send_empty_proof(self) {
387 let _ = self.state_root_message_sender.send(MultiProofMessage::EmptyProof {
388 sequence_number: self.proof_sequence_number,
389 state: self.hashed_state_update,
390 });
391 }
392}
393
394#[derive(Debug)]
405pub struct MultiproofManager {
406 proof_worker_handle: ProofWorkerHandle,
408 proof_result_tx: CrossbeamSender<ProofResultMessage>,
411 metrics: MultiProofTaskMetrics,
413}
414
415impl MultiproofManager {
416 fn new(
418 metrics: MultiProofTaskMetrics,
419 proof_worker_handle: ProofWorkerHandle,
420 proof_result_tx: CrossbeamSender<ProofResultMessage>,
421 ) -> Self {
422 metrics.max_storage_workers.set(proof_worker_handle.total_storage_workers() as f64);
424 metrics.max_account_workers.set(proof_worker_handle.total_account_workers() as f64);
425
426 Self { metrics, proof_worker_handle, proof_result_tx }
427 }
428
429 fn dispatch(&self, input: MultiproofInput) {
431 if input.proof_targets.is_empty() {
433 trace!(
434 sequence_number = input.proof_sequence_number,
435 "No proof targets, sending empty multiproof back immediately"
436 );
437 input.send_empty_proof();
438 return;
439 }
440
441 self.dispatch_multiproof(input);
442 }
443
444 fn on_calculation_complete(&self) {
446 self.metrics
447 .active_storage_workers_histogram
448 .record(self.proof_worker_handle.active_storage_workers() as f64);
449 self.metrics
450 .active_account_workers_histogram
451 .record(self.proof_worker_handle.active_account_workers() as f64);
452 self.metrics
453 .pending_storage_multiproofs_histogram
454 .record(self.proof_worker_handle.pending_storage_tasks() as f64);
455 self.metrics
456 .pending_account_multiproofs_histogram
457 .record(self.proof_worker_handle.pending_account_tasks() as f64);
458 }
459
460 fn dispatch_multiproof(&self, multiproof_input: MultiproofInput) {
462 let MultiproofInput {
463 source,
464 hashed_state_update,
465 proof_targets,
466 proof_sequence_number,
467 state_root_message_sender: _,
468 multi_added_removed_keys,
469 } = multiproof_input;
470
471 trace!(
472 target: "engine::tree::payload_processor::multiproof",
473 proof_sequence_number,
474 ?proof_targets,
475 account_targets = proof_targets.account_targets_len(),
476 storage_targets = proof_targets.storage_targets_len(),
477 ?source,
478 "Dispatching multiproof to workers"
479 );
480
481 let start = Instant::now();
482
483 let proof_result_sender = ProofResultContext::new(
485 self.proof_result_tx.clone(),
486 proof_sequence_number,
487 hashed_state_update,
488 start,
489 );
490
491 let input = match proof_targets {
492 VersionedMultiProofTargets::Legacy(proof_targets) => {
493 let frozen_prefix_sets = ParallelProof::extend_prefix_sets_with_targets(
495 &Default::default(),
496 &proof_targets,
497 );
498
499 AccountMultiproofInput::Legacy {
500 targets: proof_targets,
501 prefix_sets: frozen_prefix_sets,
502 collect_branch_node_masks: true,
503 multi_added_removed_keys,
504 proof_result_sender,
505 }
506 }
507 VersionedMultiProofTargets::V2(proof_targets) => {
508 AccountMultiproofInput::V2 { targets: proof_targets, proof_result_sender }
509 }
510 };
511
512 if let Err(e) = self.proof_worker_handle.dispatch_account_multiproof(input) {
514 error!(target: "engine::tree::payload_processor::multiproof", ?e, "Failed to dispatch account multiproof");
515 return;
516 }
517
518 self.metrics
519 .active_storage_workers_histogram
520 .record(self.proof_worker_handle.active_storage_workers() as f64);
521 self.metrics
522 .active_account_workers_histogram
523 .record(self.proof_worker_handle.active_account_workers() as f64);
524 self.metrics
525 .pending_storage_multiproofs_histogram
526 .record(self.proof_worker_handle.pending_storage_tasks() as f64);
527 self.metrics
528 .pending_account_multiproofs_histogram
529 .record(self.proof_worker_handle.pending_account_tasks() as f64);
530 }
531}
532
533#[derive(Metrics, Clone)]
534#[metrics(scope = "tree.root")]
535pub(crate) struct MultiProofTaskMetrics {
536 pub active_storage_workers_histogram: Histogram,
538 pub active_account_workers_histogram: Histogram,
540 pub max_storage_workers: Gauge,
542 pub max_account_workers: Gauge,
544 pub pending_storage_multiproofs_histogram: Histogram,
546 pub pending_account_multiproofs_histogram: Histogram,
548
549 pub prefetch_proof_targets_accounts_histogram: Histogram,
551 pub prefetch_proof_targets_storages_histogram: Histogram,
553 pub prefetch_proof_chunks_histogram: Histogram,
555
556 pub state_update_proof_targets_accounts_histogram: Histogram,
558 pub state_update_proof_targets_storages_histogram: Histogram,
560 pub state_update_proof_chunks_histogram: Histogram,
562
563 pub prefetch_batch_size_histogram: Histogram,
565
566 pub proof_calculation_duration_histogram: Histogram,
568
569 pub sparse_trie_update_duration_histogram: Histogram,
571 pub sparse_trie_final_update_duration_histogram: Histogram,
573 pub sparse_trie_total_duration_histogram: Histogram,
575
576 pub state_updates_received_histogram: Histogram,
578 pub proofs_processed_histogram: Histogram,
580 pub multiproof_task_total_duration_histogram: Histogram,
582 pub first_update_wait_time_histogram: Histogram,
584 pub last_proof_wait_time_histogram: Histogram,
586 pub into_trie_for_reuse_duration_histogram: Histogram,
588 pub sparse_trie_cache_wait_duration_histogram: Histogram,
590}
591
592#[derive(Debug)]
692pub(super) struct MultiProofTask {
693 chunk_size: Option<usize>,
696 rx: CrossbeamReceiver<MultiProofMessage>,
698 tx: CrossbeamSender<MultiProofMessage>,
700 proof_result_rx: CrossbeamReceiver<ProofResultMessage>,
702 to_sparse_trie: std::sync::mpsc::Sender<SparseTrieUpdate>,
704 fetched_proof_targets: MultiProofTargets,
706 multi_added_removed_keys: MultiAddedRemovedKeys,
708 proof_sequencer: ProofSequencer,
710 multiproof_manager: MultiproofManager,
712 metrics: MultiProofTaskMetrics,
714 max_targets_for_chunking: usize,
718 v2_proofs_enabled: bool,
721}
722
723impl MultiProofTask {
724 pub(super) fn new(
727 proof_worker_handle: ProofWorkerHandle,
728 to_sparse_trie: std::sync::mpsc::Sender<SparseTrieUpdate>,
729 chunk_size: Option<usize>,
730 tx: CrossbeamSender<MultiProofMessage>,
731 rx: CrossbeamReceiver<MultiProofMessage>,
732 ) -> Self {
733 let (proof_result_tx, proof_result_rx) = unbounded();
734 let metrics = MultiProofTaskMetrics::default();
735
736 Self {
737 chunk_size,
738 rx,
739 tx,
740 proof_result_rx,
741 to_sparse_trie,
742 fetched_proof_targets: Default::default(),
743 multi_added_removed_keys: MultiAddedRemovedKeys::new(),
744 proof_sequencer: ProofSequencer::default(),
745 multiproof_manager: MultiproofManager::new(
746 metrics.clone(),
747 proof_worker_handle,
748 proof_result_tx,
749 ),
750 metrics,
751 max_targets_for_chunking: DEFAULT_MAX_TARGETS_FOR_CHUNKING,
752 v2_proofs_enabled: false,
753 }
754 }
755
756 pub(super) const fn with_v2_proofs_enabled(mut self, v2_proofs_enabled: bool) -> Self {
758 self.v2_proofs_enabled = v2_proofs_enabled;
759 self
760 }
761
762 #[instrument(
766 level = "debug",
767 target = "engine::tree::payload_processor::multiproof",
768 skip_all,
769 fields(accounts = targets.account_targets_len(), chunks = 0)
770 )]
771 fn on_prefetch_proof(&mut self, mut targets: VersionedMultiProofTargets) -> u64 {
772 targets.retain_difference(&self.fetched_proof_targets);
774
775 if targets.is_empty() {
776 return 0;
777 }
778
779 extend_multiproof_targets(&mut self.fetched_proof_targets, &targets);
780
781 let multi_added_removed_keys =
789 if let VersionedMultiProofTargets::Legacy(legacy_targets) = &targets {
790 self.multi_added_removed_keys.touch_accounts(legacy_targets.keys().copied());
791 Some(Arc::new(MultiAddedRemovedKeys {
792 account: self.multi_added_removed_keys.account.clone(),
793 storages: legacy_targets
794 .keys()
795 .filter_map(|k| {
796 self.multi_added_removed_keys.storages.get(k).map(|v| (*k, v.clone()))
797 })
798 .collect(),
799 }))
800 } else {
801 None
802 };
803
804 self.metrics.prefetch_proof_targets_accounts_histogram.record(targets.len() as f64);
805 self.metrics
806 .prefetch_proof_targets_storages_histogram
807 .record(targets.storage_count() as f64);
808
809 let chunking_len = targets.chunking_length();
810 let available_account_workers =
811 self.multiproof_manager.proof_worker_handle.available_account_workers();
812 let available_storage_workers =
813 self.multiproof_manager.proof_worker_handle.available_storage_workers();
814 let num_chunks = dispatch_with_chunking(
815 targets,
816 chunking_len,
817 self.chunk_size,
818 self.max_targets_for_chunking,
819 available_account_workers,
820 available_storage_workers,
821 VersionedMultiProofTargets::chunks,
822 |proof_targets| {
823 self.multiproof_manager.dispatch(MultiproofInput {
824 source: None,
825 hashed_state_update: Default::default(),
826 proof_targets,
827 proof_sequence_number: self.proof_sequencer.next_sequence(),
828 state_root_message_sender: self.tx.clone(),
829 multi_added_removed_keys: multi_added_removed_keys.clone(),
830 });
831 },
832 );
833 self.metrics.prefetch_proof_chunks_histogram.record(num_chunks as f64);
834
835 num_chunks as u64
836 }
837
838 fn is_done(&self, metrics: &MultiproofBatchMetrics, ctx: &MultiproofBatchCtx) -> bool {
840 let all_proofs_processed = metrics.all_proofs_processed();
841 let no_pending = !self.proof_sequencer.has_pending();
842 let updates_finished = ctx.updates_finished();
843 trace!(
844 target: "engine::tree::payload_processor::multiproof",
845 proofs_processed = metrics.proofs_processed,
846 state_update_proofs_requested = metrics.state_update_proofs_requested,
847 prefetch_proofs_requested = metrics.prefetch_proofs_requested,
848 no_pending,
849 updates_finished,
850 "Checking end condition"
851 );
852 all_proofs_processed && no_pending && updates_finished
853 }
854
855 #[instrument(
860 level = "debug",
861 target = "engine::tree::payload_processor::multiproof",
862 skip(self, update),
863 fields(accounts = update.len(), chunks = 0)
864 )]
865 fn on_state_update(&mut self, source: Source, update: EvmState) -> u64 {
866 let hashed_state_update = evm_state_to_hashed_post_state(update);
867 self.on_hashed_state_update(source, hashed_state_update)
868 }
869
870 fn on_hashed_state_update(
874 &mut self,
875 source: Source,
876 hashed_state_update: HashedPostState,
877 ) -> u64 {
878 self.multi_added_removed_keys.update_with_state(&hashed_state_update);
880
881 let (fetched_state_update, not_fetched_state_update) = hashed_state_update
884 .partition_by_targets(&self.fetched_proof_targets, &self.multi_added_removed_keys);
885
886 let mut state_updates = 0;
887 if !fetched_state_update.is_empty() {
890 let _ = self.tx.send(MultiProofMessage::EmptyProof {
891 sequence_number: self.proof_sequencer.next_sequence(),
892 state: fetched_state_update,
893 });
894 state_updates += 1;
895 }
896
897 if not_fetched_state_update.is_empty() {
898 return state_updates;
899 }
900
901 let multi_added_removed_keys = Arc::new(MultiAddedRemovedKeys {
903 account: self.multi_added_removed_keys.account.clone(),
904 storages: {
905 let mut storages = B256Map::with_capacity_and_hasher(
906 not_fetched_state_update.storages.len(),
907 Default::default(),
908 );
909
910 for account in not_fetched_state_update
911 .storages
912 .keys()
913 .chain(not_fetched_state_update.accounts.keys())
914 {
915 if let hash_map::Entry::Vacant(entry) = storages.entry(*account) {
916 entry.insert(
917 self.multi_added_removed_keys
918 .storages
919 .get(account)
920 .cloned()
921 .unwrap_or_default(),
922 );
923 }
924 }
925
926 storages
927 },
928 });
929
930 let chunking_len = not_fetched_state_update.chunking_length();
931 let mut spawned_proof_targets = MultiProofTargets::default();
932 let available_account_workers =
933 self.multiproof_manager.proof_worker_handle.available_account_workers();
934 let available_storage_workers =
935 self.multiproof_manager.proof_worker_handle.available_storage_workers();
936
937 let num_chunks = dispatch_with_chunking(
938 not_fetched_state_update,
939 chunking_len,
940 self.chunk_size,
941 self.max_targets_for_chunking,
942 available_account_workers,
943 available_storage_workers,
944 HashedPostState::chunks,
945 |hashed_state_update| {
946 let proof_targets = get_proof_targets(
947 &hashed_state_update,
948 &self.fetched_proof_targets,
949 &multi_added_removed_keys,
950 self.v2_proofs_enabled,
951 );
952 extend_multiproof_targets(&mut spawned_proof_targets, &proof_targets);
953
954 self.multiproof_manager.dispatch(MultiproofInput {
955 source: Some(source),
956 hashed_state_update,
957 proof_targets,
958 proof_sequence_number: self.proof_sequencer.next_sequence(),
959 state_root_message_sender: self.tx.clone(),
960 multi_added_removed_keys: Some(multi_added_removed_keys.clone()),
961 });
962 },
963 );
964 self.metrics
965 .state_update_proof_targets_accounts_histogram
966 .record(spawned_proof_targets.len() as f64);
967 self.metrics
968 .state_update_proof_targets_storages_histogram
969 .record(spawned_proof_targets.values().map(|slots| slots.len()).sum::<usize>() as f64);
970 self.metrics.state_update_proof_chunks_histogram.record(num_chunks as f64);
971
972 self.fetched_proof_targets.extend(spawned_proof_targets);
973
974 state_updates + num_chunks as u64
975 }
976
977 fn on_proof(
979 &mut self,
980 sequence_number: u64,
981 update: SparseTrieUpdate,
982 ) -> Option<SparseTrieUpdate> {
983 let ready_proofs = self.proof_sequencer.add_proof(sequence_number, update);
984
985 ready_proofs
986 .into_iter()
987 .reduce(|mut acc_update, update| {
989 acc_update.extend(update);
990 acc_update
991 })
992 .filter(|proof| !proof.is_empty())
994 }
995
996 fn process_multiproof_message<P>(
1004 &mut self,
1005 msg: MultiProofMessage,
1006 ctx: &mut MultiproofBatchCtx,
1007 batch_metrics: &mut MultiproofBatchMetrics,
1008 provider: &P,
1009 ) -> bool
1010 where
1011 P: AccountReader,
1012 {
1013 match msg {
1014 MultiProofMessage::PrefetchProofs(targets) => {
1016 trace!(target: "engine::tree::payload_processor::multiproof", "processing MultiProofMessage::PrefetchProofs");
1017
1018 if ctx.first_update_time.is_none() {
1019 self.metrics
1020 .first_update_wait_time_histogram
1021 .record(ctx.start.elapsed().as_secs_f64());
1022 ctx.first_update_time = Some(Instant::now());
1023 debug!(target: "engine::tree::payload_processor::multiproof", "Started state root calculation");
1024 }
1025
1026 let mut accumulated_count = targets.chunking_length();
1027 ctx.accumulated_prefetch_targets.clear();
1028 ctx.accumulated_prefetch_targets.push(targets);
1029
1030 while accumulated_count < PREFETCH_MAX_BATCH_TARGETS &&
1034 ctx.accumulated_prefetch_targets.len() < PREFETCH_MAX_BATCH_MESSAGES
1035 {
1036 match self.rx.try_recv() {
1037 Ok(MultiProofMessage::PrefetchProofs(next_targets)) => {
1038 let next_count = next_targets.chunking_length();
1039 if accumulated_count + next_count > PREFETCH_MAX_BATCH_TARGETS {
1040 ctx.pending_msg =
1041 Some(MultiProofMessage::PrefetchProofs(next_targets));
1042 break;
1043 }
1044 accumulated_count += next_count;
1045 ctx.accumulated_prefetch_targets.push(next_targets);
1046 }
1047 Ok(MultiProofMessage::EmptyProof { sequence_number, state }) => {
1048 batch_metrics.proofs_processed += 1;
1050 if let Some(combined_update) = self.on_proof(
1051 sequence_number,
1052 SparseTrieUpdate {
1053 state,
1054 multiproof: ProofResult::empty(self.v2_proofs_enabled),
1055 },
1056 ) {
1057 let _ = self.to_sparse_trie.send(combined_update);
1058 }
1059 }
1060 Ok(other_msg) => {
1061 ctx.pending_msg = Some(other_msg);
1062 break;
1063 }
1064 Err(_) => break,
1065 }
1066 }
1067
1068 let num_batched = ctx.accumulated_prefetch_targets.len();
1070 self.metrics.prefetch_batch_size_histogram.record(num_batched as f64);
1071
1072 let mut accumulated_iter = ctx.accumulated_prefetch_targets.drain(..);
1075 let mut merged_targets =
1076 accumulated_iter.next().expect("prefetch batch always has at least one entry");
1077 for next_targets in accumulated_iter {
1078 merged_targets.extend(next_targets);
1079 }
1080
1081 let account_targets = merged_targets.len();
1082 let storage_targets = merged_targets.storage_count();
1083 batch_metrics.prefetch_proofs_requested += self.on_prefetch_proof(merged_targets);
1084 trace!(
1085 target: "engine::tree::payload_processor::multiproof",
1086 account_targets,
1087 storage_targets,
1088 prefetch_proofs_requested = batch_metrics.prefetch_proofs_requested,
1089 num_batched,
1090 "Dispatched prefetch batch"
1091 );
1092
1093 false
1094 }
1095 MultiProofMessage::StateUpdate(source, update) => {
1096 trace!(target: "engine::tree::payload_processor::multiproof", "processing MultiProofMessage::StateUpdate");
1097
1098 if ctx.first_update_time.is_none() {
1099 self.metrics
1100 .first_update_wait_time_histogram
1101 .record(ctx.start.elapsed().as_secs_f64());
1102 ctx.first_update_time = Some(Instant::now());
1103 debug!(target: "engine::tree::payload_processor::multiproof", "Started state root calculation");
1104 }
1105
1106 let update_len = update.len();
1107 batch_metrics.state_update_proofs_requested += self.on_state_update(source, update);
1108 trace!(
1109 target: "engine::tree::payload_processor::multiproof",
1110 ?source,
1111 len = update_len,
1112 state_update_proofs_requested = ?batch_metrics.state_update_proofs_requested,
1113 "Dispatched state update"
1114 );
1115
1116 false
1117 }
1118 MultiProofMessage::BlockAccessList(bal) => {
1120 trace!(target: "engine::tree::payload_processor::multiproof", "processing MultiProofMessage::BAL");
1121
1122 if ctx.first_update_time.is_none() {
1123 self.metrics
1124 .first_update_wait_time_histogram
1125 .record(ctx.start.elapsed().as_secs_f64());
1126 ctx.first_update_time = Some(Instant::now());
1127 debug!(target: "engine::tree::payload_processor::multiproof", "Started state root calculation from BAL");
1128 }
1129
1130 match bal_to_hashed_post_state(&bal, provider) {
1132 Ok(hashed_state) => {
1133 debug!(
1134 target: "engine::tree::payload_processor::multiproof",
1135 accounts = hashed_state.accounts.len(),
1136 storages = hashed_state.storages.len(),
1137 "Processing BAL state update"
1138 );
1139
1140 batch_metrics.state_update_proofs_requested +=
1142 self.on_hashed_state_update(Source::BlockAccessList, hashed_state);
1143 }
1144 Err(err) => {
1145 error!(target: "engine::tree::payload_processor::multiproof", ?err, "Failed to convert BAL to hashed state");
1146 return true;
1147 }
1148 }
1149
1150 ctx.updates_finished_time = Some(Instant::now());
1152
1153 if self.is_done(batch_metrics, ctx) {
1155 debug!(
1156 target: "engine::tree::payload_processor::multiproof",
1157 "BAL processed and all proofs complete, ending calculation"
1158 );
1159 return true;
1160 }
1161 false
1162 }
1163 MultiProofMessage::FinishedStateUpdates => {
1165 trace!(target: "engine::tree::payload_processor::multiproof", "processing MultiProofMessage::FinishedStateUpdates");
1166
1167 ctx.updates_finished_time = Some(Instant::now());
1168
1169 if self.is_done(batch_metrics, ctx) {
1170 debug!(
1171 target: "engine::tree::payload_processor::multiproof",
1172 "State updates finished and all proofs processed, ending calculation"
1173 );
1174 return true;
1175 }
1176 false
1177 }
1178 MultiProofMessage::EmptyProof { sequence_number, state } => {
1180 trace!(target: "engine::tree::payload_processor::multiproof", "processing MultiProofMessage::EmptyProof");
1181
1182 batch_metrics.proofs_processed += 1;
1183
1184 if let Some(combined_update) = self.on_proof(
1185 sequence_number,
1186 SparseTrieUpdate {
1187 state,
1188 multiproof: ProofResult::empty(self.v2_proofs_enabled),
1189 },
1190 ) {
1191 let _ = self.to_sparse_trie.send(combined_update);
1192 }
1193
1194 if self.is_done(batch_metrics, ctx) {
1195 debug!(
1196 target: "engine::tree::payload_processor::multiproof",
1197 "State updates finished and all proofs processed, ending calculation"
1198 );
1199 return true;
1200 }
1201 false
1202 }
1203 MultiProofMessage::HashedStateUpdate(hashed_state) => {
1204 batch_metrics.state_update_proofs_requested +=
1205 self.on_hashed_state_update(Source::BlockAccessList, hashed_state);
1206 false
1207 }
1208 }
1209 }
1210
1211 #[instrument(
1247 level = "debug",
1248 name = "MultiProofTask::run",
1249 target = "engine::tree::payload_processor::multiproof",
1250 skip_all
1251 )]
1252 pub(crate) fn run<P>(mut self, provider: P)
1253 where
1254 P: AccountReader,
1255 {
1256 let mut ctx = MultiproofBatchCtx::new(Instant::now());
1257 let mut batch_metrics = MultiproofBatchMetrics::default();
1258
1259 'main: loop {
1262 trace!(target: "engine::tree::payload_processor::multiproof", "entering main channel receiving loop");
1263
1264 if let Some(msg) = ctx.pending_msg.take() {
1265 if self.process_multiproof_message(msg, &mut ctx, &mut batch_metrics, &provider) {
1266 break 'main;
1267 }
1268 continue;
1269 }
1270
1271 crossbeam_channel::select_biased! {
1274 recv(self.proof_result_rx) -> proof_msg => {
1275 match proof_msg {
1276 Ok(proof_result) => {
1277 batch_metrics.proofs_processed += 1;
1278
1279 self.metrics
1280 .proof_calculation_duration_histogram
1281 .record(proof_result.elapsed);
1282
1283 self.multiproof_manager.on_calculation_complete();
1284
1285 match proof_result.result {
1287 Ok(proof_result_data) => {
1288 trace!(
1289 target: "engine::tree::payload_processor::multiproof",
1290 sequence = proof_result.sequence_number,
1291 total_proofs = batch_metrics.proofs_processed,
1292 "Processing calculated proof from worker"
1293 );
1294
1295 let update = SparseTrieUpdate {
1296 state: proof_result.state,
1297 multiproof: proof_result_data,
1298 };
1299
1300 if let Some(combined_update) =
1301 self.on_proof(proof_result.sequence_number, update)
1302 {
1303 let _ = self.to_sparse_trie.send(combined_update);
1304 }
1305 }
1306 Err(error) => {
1307 error!(target: "engine::tree::payload_processor::multiproof", ?error, "proof calculation error from worker");
1308 return
1309 }
1310 }
1311
1312 if self.is_done(&batch_metrics, &ctx) {
1313 debug!(
1314 target: "engine::tree::payload_processor::multiproof",
1315 "State updates finished and all proofs processed, ending calculation"
1316 );
1317 break 'main
1318 }
1319 }
1320 Err(_) => {
1321 error!(target: "engine::tree::payload_processor::multiproof", "Proof result channel closed unexpectedly");
1322 return
1323 }
1324 }
1325 },
1326 recv(self.rx) -> message => {
1327 let msg = match message {
1328 Ok(m) => m,
1329 Err(_) => {
1330 error!(target: "engine::tree::payload_processor::multiproof", "State root related message channel closed unexpectedly");
1331 return
1332 }
1333 };
1334
1335 if self.process_multiproof_message(msg, &mut ctx, &mut batch_metrics, &provider) {
1336 break 'main;
1337 }
1338 }
1339 }
1340 }
1341
1342 debug!(
1343 target: "engine::tree::payload_processor::multiproof",
1344 total_updates = batch_metrics.state_update_proofs_requested,
1345 total_proofs = batch_metrics.proofs_processed,
1346 total_time = ?ctx.first_update_time.map(|t|t.elapsed()),
1347 time_since_updates_finished = ?ctx.updates_finished_time.map(|t|t.elapsed()),
1348 "All proofs processed, ending calculation"
1349 );
1350
1351 self.metrics
1353 .state_updates_received_histogram
1354 .record(batch_metrics.state_update_proofs_requested as f64);
1355 self.metrics.proofs_processed_histogram.record(batch_metrics.proofs_processed as f64);
1356 if let Some(total_time) = ctx.first_update_time.map(|t| t.elapsed()) {
1357 self.metrics.multiproof_task_total_duration_histogram.record(total_time);
1358 }
1359
1360 if let Some(updates_finished_time) = ctx.updates_finished_time {
1361 self.metrics
1362 .last_proof_wait_time_histogram
1363 .record(updates_finished_time.elapsed().as_secs_f64());
1364 }
1365 }
1366}
1367
1368struct MultiproofBatchCtx {
1375 pending_msg: Option<MultiProofMessage>,
1379 first_update_time: Option<Instant>,
1381 start: Instant,
1383 updates_finished_time: Option<Instant>,
1386 accumulated_prefetch_targets: Vec<VersionedMultiProofTargets>,
1388}
1389
1390impl MultiproofBatchCtx {
1391 fn new(start: Instant) -> Self {
1393 Self {
1394 pending_msg: None,
1395 first_update_time: None,
1396 start,
1397 updates_finished_time: None,
1398 accumulated_prefetch_targets: Vec::with_capacity(PREFETCH_MAX_BATCH_MESSAGES),
1399 }
1400 }
1401
1402 const fn updates_finished(&self) -> bool {
1404 self.updates_finished_time.is_some()
1405 }
1406}
1407
1408#[derive(Default)]
1410struct MultiproofBatchMetrics {
1411 proofs_processed: u64,
1413 state_update_proofs_requested: u64,
1415 prefetch_proofs_requested: u64,
1417}
1418
1419impl MultiproofBatchMetrics {
1420 const fn all_proofs_processed(&self) -> bool {
1422 self.proofs_processed >= self.state_update_proofs_requested + self.prefetch_proofs_requested
1423 }
1424}
1425
1426fn get_proof_targets(
1430 state_update: &HashedPostState,
1431 fetched_proof_targets: &MultiProofTargets,
1432 multi_added_removed_keys: &MultiAddedRemovedKeys,
1433 v2_enabled: bool,
1434) -> VersionedMultiProofTargets {
1435 if v2_enabled {
1436 let mut targets = MultiProofTargetsV2::default();
1437
1438 for &hashed_address in state_update.accounts.keys() {
1440 if !fetched_proof_targets.contains_key(&hashed_address) {
1441 targets.account_targets.push(hashed_address.into());
1442 }
1443 }
1444
1445 for (hashed_address, storage) in &state_update.storages {
1447 let fetched = fetched_proof_targets.get(hashed_address);
1448
1449 if storage.wiped && fetched.is_none() {
1451 targets.account_targets.push(Into::<proof_v2::Target>::into(*hashed_address));
1452 continue
1453 }
1454
1455 let changed_slots = storage
1456 .storage
1457 .keys()
1458 .filter(|slot| !fetched.is_some_and(|f| f.contains(*slot)))
1459 .map(|slot| Into::<proof_v2::Target>::into(*slot))
1460 .collect::<Vec<_>>();
1461
1462 if !changed_slots.is_empty() {
1463 targets.account_targets.push((*hashed_address).into());
1464 targets.storage_targets.insert(*hashed_address, changed_slots);
1465 }
1466 }
1467
1468 VersionedMultiProofTargets::V2(targets)
1469 } else {
1470 let mut targets = MultiProofTargets::default();
1471
1472 for hashed_address in state_update.accounts.keys() {
1474 if !fetched_proof_targets.contains_key(hashed_address) {
1475 targets.insert(*hashed_address, HashSet::default());
1476 }
1477 }
1478
1479 for (hashed_address, storage) in &state_update.storages {
1481 let fetched = fetched_proof_targets.get(hashed_address);
1482 let storage_added_removed_keys = multi_added_removed_keys.get_storage(hashed_address);
1483 let mut changed_slots = storage
1484 .storage
1485 .keys()
1486 .filter(|slot| {
1487 !fetched.is_some_and(|f| f.contains(*slot)) ||
1488 storage_added_removed_keys.is_some_and(|k| k.is_removed(slot))
1489 })
1490 .peekable();
1491
1492 if storage.wiped && fetched.is_none() {
1494 targets.entry(*hashed_address).or_default();
1495 }
1496
1497 if changed_slots.peek().is_some() {
1498 targets.entry(*hashed_address).or_default().extend(changed_slots);
1499 }
1500 }
1501
1502 VersionedMultiProofTargets::Legacy(targets)
1503 }
1504}
1505
1506#[allow(clippy::too_many_arguments)]
1509pub(crate) fn dispatch_with_chunking<T, I>(
1510 items: T,
1511 chunking_len: usize,
1512 chunk_size: Option<usize>,
1513 max_targets_for_chunking: usize,
1514 available_account_workers: usize,
1515 available_storage_workers: usize,
1516 chunker: impl FnOnce(T, usize) -> I,
1517 mut dispatch: impl FnMut(T),
1518) -> usize
1519where
1520 I: IntoIterator<Item = T>,
1521{
1522 let should_chunk = chunking_len > max_targets_for_chunking ||
1523 available_account_workers > 1 ||
1524 available_storage_workers > 1;
1525
1526 if should_chunk &&
1527 let Some(chunk_size) = chunk_size &&
1528 chunking_len > chunk_size
1529 {
1530 let mut num_chunks = 0usize;
1531 for chunk in chunker(items, chunk_size) {
1532 dispatch(chunk);
1533 num_chunks += 1;
1534 }
1535 return num_chunks;
1536 }
1537
1538 dispatch(items);
1539 1
1540}
1541
1542#[cfg(test)]
1543mod tests {
1544 use crate::tree::cached_state::CachedStateProvider;
1545
1546 use super::*;
1547 use alloy_eip7928::{AccountChanges, BalanceChange};
1548 use alloy_primitives::Address;
1549 use reth_provider::{
1550 providers::OverlayStateProviderFactory, test_utils::create_test_provider_factory,
1551 BlockNumReader, BlockReader, ChangeSetReader, DatabaseProviderFactory, LatestStateProvider,
1552 PruneCheckpointReader, StageCheckpointReader, StateProviderBox, StorageChangeSetReader,
1553 StorageSettingsCache,
1554 };
1555 use reth_trie::MultiProof;
1556 use reth_trie_db::ChangesetCache;
1557 use reth_trie_parallel::proof_task::{ProofTaskCtx, ProofWorkerHandle};
1558 use revm_primitives::{B256, U256};
1559 use std::sync::{Arc, OnceLock};
1560
1561 fn get_test_runtime() -> &'static reth_tasks::Runtime {
1563 static TEST_RT: OnceLock<reth_tasks::Runtime> = OnceLock::new();
1564 TEST_RT.get_or_init(reth_tasks::Runtime::test)
1565 }
1566
1567 fn create_test_state_root_task<F>(factory: F) -> MultiProofTask
1568 where
1569 F: DatabaseProviderFactory<
1570 Provider: BlockReader
1571 + StageCheckpointReader
1572 + PruneCheckpointReader
1573 + ChangeSetReader
1574 + StorageChangeSetReader
1575 + StorageSettingsCache
1576 + BlockNumReader,
1577 > + Clone
1578 + Send
1579 + 'static,
1580 {
1581 let runtime = get_test_runtime();
1582 let changeset_cache = ChangesetCache::new();
1583 let overlay_factory = OverlayStateProviderFactory::new(factory, changeset_cache);
1584 let task_ctx = ProofTaskCtx::new(overlay_factory);
1585 let proof_handle = ProofWorkerHandle::new(runtime, task_ctx, false);
1586 let (to_sparse_trie, _receiver) = std::sync::mpsc::channel();
1587 let (tx, rx) = crossbeam_channel::unbounded();
1588
1589 MultiProofTask::new(proof_handle, to_sparse_trie, Some(1), tx, rx)
1590 }
1591
1592 fn create_cached_provider<F>(factory: F) -> CachedStateProvider<StateProviderBox>
1593 where
1594 F: DatabaseProviderFactory<
1595 Provider: BlockReader
1596 + StageCheckpointReader
1597 + PruneCheckpointReader
1598 + reth_provider::StorageSettingsCache,
1599 > + Clone
1600 + Send
1601 + 'static,
1602 {
1603 let db_provider = factory.database_provider_ro().unwrap();
1604 let state_provider: StateProviderBox = Box::new(LatestStateProvider::new(db_provider));
1605 let cache = crate::tree::cached_state::ExecutionCache::new(1000);
1606 CachedStateProvider::new(state_provider, cache, Default::default())
1607 }
1608
1609 #[test]
1610 fn test_add_proof_in_sequence() {
1611 let mut sequencer = ProofSequencer::default();
1612 let proof1 = MultiProof::default();
1613 let proof2 = MultiProof::default();
1614 sequencer.next_sequence = 2;
1615
1616 let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1).unwrap());
1617 assert_eq!(ready.len(), 1);
1618 assert!(!sequencer.has_pending());
1619
1620 let ready = sequencer.add_proof(1, SparseTrieUpdate::from_multiproof(proof2).unwrap());
1621 assert_eq!(ready.len(), 1);
1622 assert!(!sequencer.has_pending());
1623 }
1624
1625 #[test]
1626 fn test_add_proof_out_of_order() {
1627 let mut sequencer = ProofSequencer::default();
1628 let proof1 = MultiProof::default();
1629 let proof2 = MultiProof::default();
1630 let proof3 = MultiProof::default();
1631 sequencer.next_sequence = 3;
1632
1633 let ready = sequencer.add_proof(2, SparseTrieUpdate::from_multiproof(proof3).unwrap());
1634 assert_eq!(ready.len(), 0);
1635 assert!(sequencer.has_pending());
1636
1637 let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1).unwrap());
1638 assert_eq!(ready.len(), 1);
1639 assert!(sequencer.has_pending());
1640
1641 let ready = sequencer.add_proof(1, SparseTrieUpdate::from_multiproof(proof2).unwrap());
1642 assert_eq!(ready.len(), 2);
1643 assert!(!sequencer.has_pending());
1644 }
1645
1646 #[test]
1647 fn test_add_proof_with_gaps() {
1648 let mut sequencer = ProofSequencer::default();
1649 let proof1 = MultiProof::default();
1650 let proof3 = MultiProof::default();
1651 sequencer.next_sequence = 3;
1652
1653 let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1).unwrap());
1654 assert_eq!(ready.len(), 1);
1655
1656 let ready = sequencer.add_proof(2, SparseTrieUpdate::from_multiproof(proof3).unwrap());
1657 assert_eq!(ready.len(), 0);
1658 assert!(sequencer.has_pending());
1659 }
1660
1661 #[test]
1662 fn test_add_proof_duplicate_sequence() {
1663 let mut sequencer = ProofSequencer::default();
1664 let proof1 = MultiProof::default();
1665 let proof2 = MultiProof::default();
1666
1667 let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1).unwrap());
1668 assert_eq!(ready.len(), 1);
1669
1670 let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof2).unwrap());
1671 assert_eq!(ready.len(), 0);
1672 assert!(!sequencer.has_pending());
1673 }
1674
1675 #[test]
1676 fn test_add_proof_batch_processing() {
1677 let mut sequencer = ProofSequencer::default();
1678 let proofs: Vec<_> = (0..5).map(|_| MultiProof::default()).collect();
1679 sequencer.next_sequence = 5;
1680
1681 sequencer.add_proof(4, SparseTrieUpdate::from_multiproof(proofs[4].clone()).unwrap());
1682 sequencer.add_proof(2, SparseTrieUpdate::from_multiproof(proofs[2].clone()).unwrap());
1683 sequencer.add_proof(1, SparseTrieUpdate::from_multiproof(proofs[1].clone()).unwrap());
1684 sequencer.add_proof(3, SparseTrieUpdate::from_multiproof(proofs[3].clone()).unwrap());
1685
1686 let ready =
1687 sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proofs[0].clone()).unwrap());
1688 assert_eq!(ready.len(), 5);
1689 assert!(!sequencer.has_pending());
1690 }
1691
1692 fn create_get_proof_targets_state() -> HashedPostState {
1693 let mut state = HashedPostState::default();
1694
1695 let addr1 = B256::random();
1696 let addr2 = B256::random();
1697 state.accounts.insert(addr1, Some(Default::default()));
1698 state.accounts.insert(addr2, Some(Default::default()));
1699
1700 let mut storage = HashedStorage::default();
1701 let slot1 = B256::random();
1702 let slot2 = B256::random();
1703 storage.storage.insert(slot1, U256::ZERO);
1704 storage.storage.insert(slot2, U256::from(1));
1705 state.storages.insert(addr1, storage);
1706
1707 state
1708 }
1709
1710 fn unwrap_legacy_targets(targets: VersionedMultiProofTargets) -> MultiProofTargets {
1711 match targets {
1712 VersionedMultiProofTargets::Legacy(targets) => targets,
1713 VersionedMultiProofTargets::V2(_) => panic!("Expected Legacy targets"),
1714 }
1715 }
1716
1717 #[test]
1718 fn test_get_proof_targets_new_account_targets() {
1719 let state = create_get_proof_targets_state();
1720 let fetched = MultiProofTargets::default();
1721
1722 let targets = unwrap_legacy_targets(get_proof_targets(
1723 &state,
1724 &fetched,
1725 &MultiAddedRemovedKeys::new(),
1726 false,
1727 ));
1728
1729 assert_eq!(targets.len(), state.accounts.len());
1731 for addr in state.accounts.keys() {
1732 assert!(targets.contains_key(addr));
1733 }
1734 }
1735
1736 #[test]
1737 fn test_get_proof_targets_new_storage_targets() {
1738 let state = create_get_proof_targets_state();
1739 let fetched = MultiProofTargets::default();
1740
1741 let targets = unwrap_legacy_targets(get_proof_targets(
1742 &state,
1743 &fetched,
1744 &MultiAddedRemovedKeys::new(),
1745 false,
1746 ));
1747
1748 for (addr, storage) in &state.storages {
1750 assert!(targets.contains_key(addr));
1751 let target_slots = &targets[addr];
1752 assert_eq!(target_slots.len(), storage.storage.len());
1753 for slot in storage.storage.keys() {
1754 assert!(target_slots.contains(slot));
1755 }
1756 }
1757 }
1758
1759 #[test]
1760 fn test_get_proof_targets_filter_already_fetched_accounts() {
1761 let state = create_get_proof_targets_state();
1762 let mut fetched = MultiProofTargets::default();
1763
1764 let fetched_addr = state
1766 .accounts
1767 .keys()
1768 .find(|&&addr| !state.storages.contains_key(&addr))
1769 .expect("Should have an account without storage");
1770
1771 fetched.insert(*fetched_addr, HashSet::default());
1773
1774 let targets = unwrap_legacy_targets(get_proof_targets(
1775 &state,
1776 &fetched,
1777 &MultiAddedRemovedKeys::new(),
1778 false,
1779 ));
1780
1781 assert!(!targets.contains_key(fetched_addr));
1783 assert_eq!(targets.len(), state.accounts.len() - 1);
1785 }
1786
1787 #[test]
1788 fn test_get_proof_targets_filter_already_fetched_storage() {
1789 let state = create_get_proof_targets_state();
1790 let mut fetched = MultiProofTargets::default();
1791
1792 let (addr, storage) = state.storages.iter().next().unwrap();
1794 let mut fetched_slots = HashSet::default();
1795 let fetched_slot = *storage.storage.keys().next().unwrap();
1796 fetched_slots.insert(fetched_slot);
1797 fetched.insert(*addr, fetched_slots);
1798
1799 let targets = unwrap_legacy_targets(get_proof_targets(
1800 &state,
1801 &fetched,
1802 &MultiAddedRemovedKeys::new(),
1803 false,
1804 ));
1805
1806 let target_slots = &targets[addr];
1808 assert!(!target_slots.contains(&fetched_slot));
1809 assert_eq!(target_slots.len(), storage.storage.len() - 1);
1810 }
1811
1812 #[test]
1813 fn test_get_proof_targets_empty_state() {
1814 let state = HashedPostState::default();
1815 let fetched = MultiProofTargets::default();
1816
1817 let targets = unwrap_legacy_targets(get_proof_targets(
1818 &state,
1819 &fetched,
1820 &MultiAddedRemovedKeys::new(),
1821 false,
1822 ));
1823
1824 assert!(targets.is_empty());
1825 }
1826
1827 #[test]
1828 fn test_get_proof_targets_mixed_fetched_state() {
1829 let mut state = HashedPostState::default();
1830 let mut fetched = MultiProofTargets::default();
1831
1832 let addr1 = B256::random();
1833 let addr2 = B256::random();
1834 let slot1 = B256::random();
1835 let slot2 = B256::random();
1836
1837 state.accounts.insert(addr1, Some(Default::default()));
1838 state.accounts.insert(addr2, Some(Default::default()));
1839
1840 let mut storage = HashedStorage::default();
1841 storage.storage.insert(slot1, U256::ZERO);
1842 storage.storage.insert(slot2, U256::from(1));
1843 state.storages.insert(addr1, storage);
1844
1845 let mut fetched_slots = HashSet::default();
1846 fetched_slots.insert(slot1);
1847 fetched.insert(addr1, fetched_slots);
1848
1849 let targets = unwrap_legacy_targets(get_proof_targets(
1850 &state,
1851 &fetched,
1852 &MultiAddedRemovedKeys::new(),
1853 false,
1854 ));
1855
1856 assert!(targets.contains_key(&addr2));
1857 assert!(!targets[&addr1].contains(&slot1));
1858 assert!(targets[&addr1].contains(&slot2));
1859 }
1860
1861 #[test]
1862 fn test_get_proof_targets_unmodified_account_with_storage() {
1863 let mut state = HashedPostState::default();
1864 let fetched = MultiProofTargets::default();
1865
1866 let addr = B256::random();
1867 let slot1 = B256::random();
1868 let slot2 = B256::random();
1869
1870 let mut storage = HashedStorage::default();
1873 storage.storage.insert(slot1, U256::from(1));
1874 storage.storage.insert(slot2, U256::from(2));
1875 state.storages.insert(addr, storage);
1876
1877 assert!(!state.accounts.contains_key(&addr));
1878 assert!(!fetched.contains_key(&addr));
1879
1880 let targets = unwrap_legacy_targets(get_proof_targets(
1881 &state,
1882 &fetched,
1883 &MultiAddedRemovedKeys::new(),
1884 false,
1885 ));
1886
1887 assert!(targets.contains_key(&addr));
1889
1890 let target_slots = &targets[&addr];
1891 assert_eq!(target_slots.len(), 2);
1892 assert!(target_slots.contains(&slot1));
1893 assert!(target_slots.contains(&slot2));
1894 }
1895
1896 #[test]
1897 fn test_get_proof_targets_with_removed_storage_keys() {
1898 let mut state = HashedPostState::default();
1899 let mut fetched = MultiProofTargets::default();
1900 let mut multi_added_removed_keys = MultiAddedRemovedKeys::new();
1901
1902 let addr = B256::random();
1903 let slot1 = B256::random();
1904 let slot2 = B256::random();
1905
1906 state.accounts.insert(addr, Some(Default::default()));
1908
1909 let mut storage = HashedStorage::default();
1911 storage.storage.insert(slot1, U256::from(100));
1912 storage.storage.insert(slot2, U256::from(200));
1913 state.storages.insert(addr, storage);
1914
1915 let mut fetched_slots = HashSet::default();
1917 fetched_slots.insert(slot1);
1918 fetched.insert(addr, fetched_slots);
1919
1920 let mut removed_state = HashedPostState::default();
1922 let mut removed_storage = HashedStorage::default();
1923 removed_storage.storage.insert(slot1, U256::ZERO); removed_state.storages.insert(addr, removed_storage);
1925 multi_added_removed_keys.update_with_state(&removed_state);
1926
1927 let targets = unwrap_legacy_targets(get_proof_targets(
1928 &state,
1929 &fetched,
1930 &multi_added_removed_keys,
1931 false,
1932 ));
1933
1934 assert!(targets.contains_key(&addr));
1936 let target_slots = &targets[&addr];
1937 assert_eq!(target_slots.len(), 2);
1938 assert!(target_slots.contains(&slot1)); assert!(target_slots.contains(&slot2)); }
1941
1942 #[test]
1943 fn test_get_proof_targets_with_wiped_storage() {
1944 let mut state = HashedPostState::default();
1945 let fetched = MultiProofTargets::default();
1946 let multi_added_removed_keys = MultiAddedRemovedKeys::new();
1947
1948 let addr = B256::random();
1949 let slot1 = B256::random();
1950
1951 state.accounts.insert(addr, Some(Default::default()));
1953
1954 let mut storage = HashedStorage::new(true);
1956 storage.storage.insert(slot1, U256::from(100));
1957 state.storages.insert(addr, storage);
1958
1959 let targets = unwrap_legacy_targets(get_proof_targets(
1960 &state,
1961 &fetched,
1962 &multi_added_removed_keys,
1963 false,
1964 ));
1965
1966 assert!(targets.contains_key(&addr));
1968 let target_slots = &targets[&addr];
1969 assert_eq!(target_slots.len(), 1);
1970 assert!(target_slots.contains(&slot1));
1971 }
1972
1973 #[test]
1974 fn test_get_proof_targets_removed_keys_not_in_state_update() {
1975 let mut state = HashedPostState::default();
1976 let mut fetched = MultiProofTargets::default();
1977 let mut multi_added_removed_keys = MultiAddedRemovedKeys::new();
1978
1979 let addr = B256::random();
1980 let slot1 = B256::random();
1981 let slot2 = B256::random();
1982 let slot3 = B256::random();
1983
1984 state.accounts.insert(addr, Some(Default::default()));
1986
1987 let mut storage = HashedStorage::default();
1989 storage.storage.insert(slot1, U256::from(100));
1990 storage.storage.insert(slot2, U256::from(200));
1991 state.storages.insert(addr, storage);
1992
1993 let mut fetched_slots = HashSet::default();
1995 fetched_slots.insert(slot1);
1996 fetched_slots.insert(slot2);
1997 fetched_slots.insert(slot3); fetched.insert(addr, fetched_slots);
1999
2000 let mut removed_state = HashedPostState::default();
2002 let mut removed_storage = HashedStorage::default();
2003 removed_storage.storage.insert(slot3, U256::ZERO);
2004 removed_state.storages.insert(addr, removed_storage);
2005 multi_added_removed_keys.update_with_state(&removed_state);
2006
2007 let targets = unwrap_legacy_targets(get_proof_targets(
2008 &state,
2009 &fetched,
2010 &multi_added_removed_keys,
2011 false,
2012 ));
2013
2014 assert!(!targets.contains_key(&addr));
2016 }
2017
2018 #[test]
2020 fn test_prefetch_proofs_batching() {
2021 let test_provider_factory = create_test_provider_factory();
2022 let mut task = create_test_state_root_task(test_provider_factory);
2023
2024 let addr1 = B256::random();
2026 let addr2 = B256::random();
2027 let addr3 = B256::random();
2028
2029 let mut targets1 = MultiProofTargets::default();
2030 targets1.insert(addr1, HashSet::default());
2031
2032 let mut targets2 = MultiProofTargets::default();
2033 targets2.insert(addr2, HashSet::default());
2034
2035 let mut targets3 = MultiProofTargets::default();
2036 targets3.insert(addr3, HashSet::default());
2037
2038 let tx = task.tx.clone();
2039 tx.send(MultiProofMessage::PrefetchProofs(VersionedMultiProofTargets::Legacy(targets1)))
2040 .unwrap();
2041 tx.send(MultiProofMessage::PrefetchProofs(VersionedMultiProofTargets::Legacy(targets2)))
2042 .unwrap();
2043 tx.send(MultiProofMessage::PrefetchProofs(VersionedMultiProofTargets::Legacy(targets3)))
2044 .unwrap();
2045
2046 let proofs_requested =
2047 if let Ok(MultiProofMessage::PrefetchProofs(targets)) = task.rx.recv() {
2048 let mut merged_targets = targets;
2050 let mut num_batched = 1;
2051 while let Ok(MultiProofMessage::PrefetchProofs(next_targets)) = task.rx.try_recv() {
2052 merged_targets.extend(next_targets);
2053 num_batched += 1;
2054 }
2055
2056 assert_eq!(num_batched, 3);
2057 assert_eq!(merged_targets.len(), 3);
2058 let legacy_targets = unwrap_legacy_targets(merged_targets);
2059 assert!(legacy_targets.contains_key(&addr1));
2060 assert!(legacy_targets.contains_key(&addr2));
2061 assert!(legacy_targets.contains_key(&addr3));
2062
2063 task.on_prefetch_proof(VersionedMultiProofTargets::Legacy(legacy_targets))
2064 } else {
2065 panic!("Expected PrefetchProofs message");
2066 };
2067
2068 assert_eq!(proofs_requested, 1);
2069 }
2070
2071 #[test]
2073 fn test_batching_preserves_ordering_with_different_message_type() {
2074 use alloy_evm::block::StateChangeSource;
2075 use revm_state::Account;
2076
2077 let test_provider_factory = create_test_provider_factory();
2078 let task = create_test_state_root_task(test_provider_factory);
2079
2080 let addr1 = B256::random();
2081 let addr2 = B256::random();
2082 let addr3 = B256::random();
2083 let state_addr1 = alloy_primitives::Address::random();
2084 let state_addr2 = alloy_primitives::Address::random();
2085
2086 let mut targets1 = MultiProofTargets::default();
2088 targets1.insert(addr1, HashSet::default());
2089
2090 let mut targets2 = MultiProofTargets::default();
2091 targets2.insert(addr2, HashSet::default());
2092
2093 let mut targets3 = MultiProofTargets::default();
2094 targets3.insert(addr3, HashSet::default());
2095
2096 let mut state_update1 = EvmState::default();
2098 state_update1.insert(
2099 state_addr1,
2100 Account {
2101 info: revm_state::AccountInfo {
2102 balance: U256::from(100),
2103 nonce: 1,
2104 code_hash: Default::default(),
2105 code: Default::default(),
2106 account_id: None,
2107 },
2108 original_info: Box::new(revm_state::AccountInfo::default()),
2109 transaction_id: Default::default(),
2110 storage: Default::default(),
2111 status: revm_state::AccountStatus::Touched,
2112 },
2113 );
2114
2115 let mut state_update2 = EvmState::default();
2117 state_update2.insert(
2118 state_addr2,
2119 Account {
2120 info: revm_state::AccountInfo {
2121 balance: U256::from(200),
2122 nonce: 2,
2123 code_hash: Default::default(),
2124 code: Default::default(),
2125 account_id: None,
2126 },
2127 original_info: Box::new(revm_state::AccountInfo::default()),
2128 transaction_id: Default::default(),
2129 storage: Default::default(),
2130 status: revm_state::AccountStatus::Touched,
2131 },
2132 );
2133
2134 let source = StateChangeSource::Transaction(42);
2135
2136 let tx = task.tx.clone();
2138 tx.send(MultiProofMessage::PrefetchProofs(VersionedMultiProofTargets::Legacy(targets1)))
2139 .unwrap();
2140 tx.send(MultiProofMessage::PrefetchProofs(VersionedMultiProofTargets::Legacy(targets2)))
2141 .unwrap();
2142 tx.send(MultiProofMessage::StateUpdate(source.into(), state_update1)).unwrap();
2143 tx.send(MultiProofMessage::StateUpdate(source.into(), state_update2)).unwrap();
2144 tx.send(MultiProofMessage::PrefetchProofs(VersionedMultiProofTargets::Legacy(
2145 targets3.clone(),
2146 )))
2147 .unwrap();
2148
2149 let mut pending_msg: Option<MultiProofMessage> = None;
2151 if let Ok(MultiProofMessage::PrefetchProofs(targets)) = task.rx.recv() {
2152 let mut merged_targets = targets;
2153 let mut num_batched = 1;
2154
2155 loop {
2156 match task.rx.try_recv() {
2157 Ok(MultiProofMessage::PrefetchProofs(next_targets)) => {
2158 merged_targets.extend(next_targets);
2159 num_batched += 1;
2160 }
2161 Ok(other_msg) => {
2162 pending_msg = Some(other_msg);
2164 break;
2165 }
2166 Err(_) => break,
2167 }
2168 }
2169
2170 assert_eq!(num_batched, 2, "Should batch only until different message type");
2172 assert_eq!(merged_targets.len(), 2);
2173 let legacy_targets = unwrap_legacy_targets(merged_targets);
2174 assert!(legacy_targets.contains_key(&addr1));
2175 assert!(legacy_targets.contains_key(&addr2));
2176 assert!(!legacy_targets.contains_key(&addr3), "addr3 should NOT be in first batch");
2177 } else {
2178 panic!("Expected PrefetchProofs message");
2179 }
2180
2181 match pending_msg {
2183 Some(MultiProofMessage::StateUpdate(_src, update)) => {
2184 assert!(update.contains_key(&state_addr1), "Should be first StateUpdate");
2185 }
2186 _ => panic!("StateUpdate1 was lost or reordered! The ordering fix is broken."),
2187 }
2188
2189 match task.rx.try_recv() {
2191 Ok(MultiProofMessage::StateUpdate(_src, update)) => {
2192 assert!(update.contains_key(&state_addr2), "Should be second StateUpdate");
2193 }
2194 _ => panic!("StateUpdate2 was lost!"),
2195 }
2196
2197 match task.rx.try_recv() {
2199 Ok(MultiProofMessage::PrefetchProofs(targets)) => {
2200 assert_eq!(targets.len(), 1);
2201 let legacy_targets = unwrap_legacy_targets(targets);
2202 assert!(legacy_targets.contains_key(&addr3));
2203 }
2204 _ => panic!("PrefetchProofs3 was lost!"),
2205 }
2206 }
2207
2208 #[test]
2210 fn test_pending_message_processed_before_next_iteration() {
2211 use alloy_evm::block::StateChangeSource;
2212 use revm_state::Account;
2213
2214 let test_provider_factory = create_test_provider_factory();
2215 let test_provider = create_cached_provider(test_provider_factory.clone());
2216 let mut task = create_test_state_root_task(test_provider_factory);
2217
2218 let prefetch_addr1 = B256::random();
2220 let prefetch_addr2 = B256::random();
2221 let mut prefetch1 = MultiProofTargets::default();
2222 prefetch1.insert(prefetch_addr1, HashSet::default());
2223 let mut prefetch2 = MultiProofTargets::default();
2224 prefetch2.insert(prefetch_addr2, HashSet::default());
2225
2226 let state_addr = alloy_primitives::Address::random();
2227 let mut state_update = EvmState::default();
2228 state_update.insert(
2229 state_addr,
2230 Account {
2231 info: revm_state::AccountInfo {
2232 balance: U256::from(42),
2233 nonce: 1,
2234 code_hash: Default::default(),
2235 code: Default::default(),
2236 account_id: None,
2237 },
2238 original_info: Box::new(revm_state::AccountInfo::default()),
2239 transaction_id: Default::default(),
2240 storage: Default::default(),
2241 status: revm_state::AccountStatus::Touched,
2242 },
2243 );
2244
2245 let source = StateChangeSource::Transaction(99);
2246
2247 let tx = task.tx.clone();
2248 tx.send(MultiProofMessage::PrefetchProofs(VersionedMultiProofTargets::Legacy(prefetch1)))
2249 .unwrap();
2250 tx.send(MultiProofMessage::StateUpdate(source.into(), state_update)).unwrap();
2251 tx.send(MultiProofMessage::PrefetchProofs(VersionedMultiProofTargets::Legacy(
2252 prefetch2.clone(),
2253 )))
2254 .unwrap();
2255
2256 let mut ctx = MultiproofBatchCtx::new(Instant::now());
2257 let mut batch_metrics = MultiproofBatchMetrics::default();
2258
2259 let first = task.rx.recv().unwrap();
2261 assert!(matches!(first, MultiProofMessage::PrefetchProofs(_)));
2262 assert!(!task.process_multiproof_message(
2263 first,
2264 &mut ctx,
2265 &mut batch_metrics,
2266 &test_provider
2267 ));
2268 let pending = ctx.pending_msg.take().expect("pending message captured");
2269 assert!(matches!(pending, MultiProofMessage::StateUpdate(_, _)));
2270
2271 assert!(!task.process_multiproof_message(
2274 pending,
2275 &mut ctx,
2276 &mut batch_metrics,
2277 &test_provider
2278 ));
2279
2280 assert!(ctx.pending_msg.is_none());
2282
2283 match task.rx.try_recv() {
2285 Ok(MultiProofMessage::PrefetchProofs(targets)) => {
2286 assert_eq!(targets.len(), 1);
2287 let legacy_targets = unwrap_legacy_targets(targets);
2288 assert!(legacy_targets.contains_key(&prefetch_addr2));
2289 }
2290 other => panic!("Expected PrefetchProofs2 in channel, got {:?}", other),
2291 }
2292 }
2293
2294 #[test]
2296 fn test_bal_message_processing() {
2297 let test_provider_factory = create_test_provider_factory();
2298 let test_provider = create_cached_provider(test_provider_factory.clone());
2299 let mut task = create_test_state_root_task(test_provider_factory);
2300
2301 let account_address = Address::random();
2303 let account_changes = AccountChanges {
2304 address: account_address,
2305 balance_changes: vec![BalanceChange::new(0, U256::from(1000))],
2306 nonce_changes: vec![],
2307 code_changes: vec![],
2308 storage_changes: vec![],
2309 storage_reads: vec![],
2310 };
2311
2312 let bal = Arc::new(vec![account_changes]);
2313
2314 let mut ctx = MultiproofBatchCtx::new(Instant::now());
2315 let mut batch_metrics = MultiproofBatchMetrics::default();
2316
2317 let should_finish = task.process_multiproof_message(
2318 MultiProofMessage::BlockAccessList(bal),
2319 &mut ctx,
2320 &mut batch_metrics,
2321 &test_provider,
2322 );
2323
2324 assert!(ctx.updates_finished_time.is_some());
2326
2327 assert!(batch_metrics.state_update_proofs_requested > 0);
2329
2330 assert!(!should_finish, "Should continue waiting for proofs");
2332 }
2333}