1use crate::tree::payload_processor::executor::WorkloadExecutor;
4use alloy_evm::block::StateChangeSource;
5use alloy_primitives::{
6 keccak256,
7 map::{B256Set, HashSet},
8 B256,
9};
10use derive_more::derive::Deref;
11use metrics::Histogram;
12use reth_errors::ProviderError;
13use reth_metrics::Metrics;
14use reth_provider::{providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, FactoryTx};
15use reth_revm::state::EvmState;
16use reth_trie::{
17 added_removed_keys::MultiAddedRemovedKeys, prefix_set::TriePrefixSetsMut,
18 updates::TrieUpdatesSorted, DecodedMultiProof, HashedPostState, HashedPostStateSorted,
19 HashedStorage, MultiProofTargets, TrieInput,
20};
21use reth_trie_parallel::{proof::ParallelProof, proof_task::ProofTaskManagerHandle};
22use std::{
23 collections::{BTreeMap, VecDeque},
24 ops::DerefMut,
25 sync::{
26 mpsc::{channel, Receiver, Sender},
27 Arc,
28 },
29 time::{Duration, Instant},
30};
31use tracing::{debug, error, trace};
32
33const MULTIPROOF_TARGETS_CHUNK_SIZE: usize = 10;
35
36#[derive(Default, Debug)]
39pub struct SparseTrieUpdate {
40 pub(crate) state: HashedPostState,
42 pub(crate) multiproof: DecodedMultiProof,
44}
45
46impl SparseTrieUpdate {
47 pub(super) fn is_empty(&self) -> bool {
49 self.state.is_empty() && self.multiproof.is_empty()
50 }
51
52 #[cfg(test)]
54 pub(super) fn from_multiproof(multiproof: reth_trie::MultiProof) -> alloy_rlp::Result<Self> {
55 Ok(Self { multiproof: multiproof.try_into()?, ..Default::default() })
56 }
57
58 pub(super) fn extend(&mut self, other: Self) {
60 self.state.extend(other.state);
61 self.multiproof.extend(other.multiproof);
62 }
63}
64
65#[derive(Debug, Clone)]
67pub(super) struct MultiProofConfig<Factory> {
68 pub consistent_view: ConsistentDbView<Factory>,
70 pub nodes_sorted: Arc<TrieUpdatesSorted>,
73 pub state_sorted: Arc<HashedPostStateSorted>,
75 pub prefix_sets: Arc<TriePrefixSetsMut>,
79}
80
81impl<Factory> MultiProofConfig<Factory> {
82 pub(super) fn new_from_input(
87 consistent_view: ConsistentDbView<Factory>,
88 mut input: TrieInput,
89 ) -> (TrieInput, Self) {
90 let config = Self {
91 consistent_view,
92 nodes_sorted: Arc::new(input.nodes.drain_into_sorted()),
93 state_sorted: Arc::new(input.state.drain_into_sorted()),
94 prefix_sets: Arc::new(input.prefix_sets.clone()),
95 };
96 (input.cleared(), config)
97 }
98}
99
100#[derive(Debug)]
102pub(super) enum MultiProofMessage {
103 PrefetchProofs(MultiProofTargets),
105 StateUpdate(StateChangeSource, EvmState),
107 EmptyProof {
112 sequence_number: u64,
114 state: HashedPostState,
116 },
117 ProofCalculated(Box<ProofCalculated>),
119 ProofCalculationError(ProviderError),
121 FinishedStateUpdates,
126}
127
128#[derive(Debug)]
130pub(super) struct ProofCalculated {
131 sequence_number: u64,
133 update: SparseTrieUpdate,
135 elapsed: Duration,
137}
138
139#[derive(Debug, Default)]
141struct ProofSequencer {
142 next_sequence: u64,
144 next_to_deliver: u64,
146 pending_proofs: BTreeMap<u64, SparseTrieUpdate>,
148}
149
150impl ProofSequencer {
151 const fn next_sequence(&mut self) -> u64 {
153 let seq = self.next_sequence;
154 self.next_sequence += 1;
155 seq
156 }
157
158 fn add_proof(&mut self, sequence: u64, update: SparseTrieUpdate) -> Vec<SparseTrieUpdate> {
161 if sequence >= self.next_to_deliver {
162 self.pending_proofs.insert(sequence, update);
163 }
164
165 if !self.pending_proofs.contains_key(&self.next_to_deliver) {
167 return Vec::new()
168 }
169
170 let mut consecutive_proofs = Vec::with_capacity(self.pending_proofs.len());
171 let mut current_sequence = self.next_to_deliver;
172
173 while let Some(pending) = self.pending_proofs.remove(¤t_sequence) {
175 consecutive_proofs.push(pending);
176 current_sequence += 1;
177
178 if !self.pending_proofs.contains_key(¤t_sequence) {
180 break;
181 }
182 }
183
184 self.next_to_deliver += consecutive_proofs.len() as u64;
185
186 consecutive_proofs
187 }
188
189 pub(crate) fn has_pending(&self) -> bool {
191 !self.pending_proofs.is_empty()
192 }
193}
194
195#[derive(Deref, Debug)]
201pub(super) struct StateHookSender(Sender<MultiProofMessage>);
202
203impl StateHookSender {
204 pub(crate) const fn new(inner: Sender<MultiProofMessage>) -> Self {
205 Self(inner)
206 }
207}
208
209impl Drop for StateHookSender {
210 fn drop(&mut self) {
211 let _ = self.0.send(MultiProofMessage::FinishedStateUpdates);
213 }
214}
215
216pub(crate) fn evm_state_to_hashed_post_state(update: EvmState) -> HashedPostState {
217 let mut hashed_state = HashedPostState::with_capacity(update.len());
218
219 for (address, account) in update {
220 if account.is_touched() {
221 let hashed_address = keccak256(address);
222 trace!(target: "engine::root", ?address, ?hashed_address, "Adding account to state update");
223
224 let destroyed = account.is_selfdestructed();
225 let info = if destroyed { None } else { Some(account.info.into()) };
226 hashed_state.accounts.insert(hashed_address, info);
227
228 let mut changed_storage_iter = account
229 .storage
230 .into_iter()
231 .filter(|(_slot, value)| value.is_changed())
232 .map(|(slot, value)| (keccak256(B256::from(slot)), value.present_value))
233 .peekable();
234
235 if destroyed {
236 hashed_state.storages.insert(hashed_address, HashedStorage::new(true));
237 } else if changed_storage_iter.peek().is_some() {
238 hashed_state
239 .storages
240 .insert(hashed_address, HashedStorage::from_iter(false, changed_storage_iter));
241 }
242 }
243 }
244
245 hashed_state
246}
247
248#[derive(Debug)]
250enum PendingMultiproofTask<Factory> {
251 Storage(StorageMultiproofInput<Factory>),
253 Regular(MultiproofInput<Factory>),
255}
256
257impl<Factory> PendingMultiproofTask<Factory> {
258 const fn proof_sequence_number(&self) -> u64 {
260 match self {
261 Self::Storage(input) => input.proof_sequence_number,
262 Self::Regular(input) => input.proof_sequence_number,
263 }
264 }
265
266 fn proof_targets_is_empty(&self) -> bool {
268 match self {
269 Self::Storage(input) => input.proof_targets.is_empty(),
270 Self::Regular(input) => input.proof_targets.is_empty(),
271 }
272 }
273
274 fn send_empty_proof(self) {
276 match self {
277 Self::Storage(input) => input.send_empty_proof(),
278 Self::Regular(input) => input.send_empty_proof(),
279 }
280 }
281}
282
283impl<Factory> From<StorageMultiproofInput<Factory>> for PendingMultiproofTask<Factory> {
284 fn from(input: StorageMultiproofInput<Factory>) -> Self {
285 Self::Storage(input)
286 }
287}
288
289impl<Factory> From<MultiproofInput<Factory>> for PendingMultiproofTask<Factory> {
290 fn from(input: MultiproofInput<Factory>) -> Self {
291 Self::Regular(input)
292 }
293}
294
295#[derive(Debug)]
297struct StorageMultiproofInput<Factory> {
298 config: MultiProofConfig<Factory>,
299 source: Option<StateChangeSource>,
300 hashed_state_update: HashedPostState,
301 hashed_address: B256,
302 proof_targets: B256Set,
303 proof_sequence_number: u64,
304 state_root_message_sender: Sender<MultiProofMessage>,
305 multi_added_removed_keys: Arc<MultiAddedRemovedKeys>,
306}
307
308impl<Factory> StorageMultiproofInput<Factory> {
309 fn send_empty_proof(self) {
311 let _ = self.state_root_message_sender.send(MultiProofMessage::EmptyProof {
312 sequence_number: self.proof_sequence_number,
313 state: self.hashed_state_update,
314 });
315 }
316}
317
318#[derive(Debug)]
320struct MultiproofInput<Factory> {
321 config: MultiProofConfig<Factory>,
322 source: Option<StateChangeSource>,
323 hashed_state_update: HashedPostState,
324 proof_targets: MultiProofTargets,
325 proof_sequence_number: u64,
326 state_root_message_sender: Sender<MultiProofMessage>,
327 multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
328}
329
330impl<Factory> MultiproofInput<Factory> {
331 fn send_empty_proof(self) {
333 let _ = self.state_root_message_sender.send(MultiProofMessage::EmptyProof {
334 sequence_number: self.proof_sequence_number,
335 state: self.hashed_state_update,
336 });
337 }
338}
339
340#[derive(Debug)]
345pub struct MultiproofManager<Factory: DatabaseProviderFactory> {
346 max_concurrent: usize,
348 inflight: usize,
350 pending: VecDeque<PendingMultiproofTask<Factory>>,
352 executor: WorkloadExecutor,
354 storage_proof_task_handle: ProofTaskManagerHandle<FactoryTx<Factory>>,
356 metrics: MultiProofTaskMetrics,
358}
359
360impl<Factory> MultiproofManager<Factory>
361where
362 Factory: DatabaseProviderFactory<Provider: BlockReader> + Clone + 'static,
363{
364 fn new(
366 executor: WorkloadExecutor,
367 metrics: MultiProofTaskMetrics,
368 storage_proof_task_handle: ProofTaskManagerHandle<FactoryTx<Factory>>,
369 max_concurrent: usize,
370 ) -> Self {
371 Self {
372 pending: VecDeque::with_capacity(max_concurrent),
373 max_concurrent,
374 executor,
375 inflight: 0,
376 metrics,
377 storage_proof_task_handle,
378 }
379 }
380
381 fn spawn_or_queue(&mut self, input: PendingMultiproofTask<Factory>) {
384 if input.proof_targets_is_empty() {
386 debug!(
387 sequence_number = input.proof_sequence_number(),
388 "No proof targets, sending empty multiproof back immediately"
389 );
390 input.send_empty_proof();
391 return
392 }
393
394 if self.inflight >= self.max_concurrent {
395 self.pending.push_back(input);
396 self.metrics.pending_multiproofs_histogram.record(self.pending.len() as f64);
397 return;
398 }
399
400 self.spawn_multiproof_task(input);
401 }
402
403 fn on_calculation_complete(&mut self) {
406 self.inflight = self.inflight.saturating_sub(1);
407 self.metrics.inflight_multiproofs_histogram.record(self.inflight as f64);
408
409 if let Some(input) = self.pending.pop_front() {
410 self.metrics.pending_multiproofs_histogram.record(self.pending.len() as f64);
411 self.spawn_multiproof_task(input);
412 }
413 }
414
415 fn spawn_multiproof_task(&mut self, input: PendingMultiproofTask<Factory>) {
418 match input {
419 PendingMultiproofTask::Storage(storage_input) => {
420 self.spawn_storage_proof(storage_input);
421 }
422 PendingMultiproofTask::Regular(multiproof_input) => {
423 self.spawn_multiproof(multiproof_input);
424 }
425 }
426 }
427
428 fn spawn_storage_proof(&mut self, storage_multiproof_input: StorageMultiproofInput<Factory>) {
430 let StorageMultiproofInput {
431 config,
432 source,
433 hashed_state_update,
434 hashed_address,
435 proof_targets,
436 proof_sequence_number,
437 state_root_message_sender,
438 multi_added_removed_keys,
439 } = storage_multiproof_input;
440
441 let storage_proof_task_handle = self.storage_proof_task_handle.clone();
442
443 self.executor.spawn_blocking(move || {
444 let storage_targets = proof_targets.len();
445
446 trace!(
447 target: "engine::root",
448 proof_sequence_number,
449 ?proof_targets,
450 storage_targets,
451 "Starting dedicated storage proof calculation",
452 );
453 let start = Instant::now();
454 let result = ParallelProof::new(
455 config.consistent_view,
456 config.nodes_sorted,
457 config.state_sorted,
458 config.prefix_sets,
459 storage_proof_task_handle.clone(),
460 )
461 .with_branch_node_masks(true)
462 .with_multi_added_removed_keys(Some(multi_added_removed_keys))
463 .decoded_storage_proof(hashed_address, proof_targets);
464 let elapsed = start.elapsed();
465 trace!(
466 target: "engine::root",
467 proof_sequence_number,
468 ?elapsed,
469 ?source,
470 storage_targets,
471 "Storage multiproofs calculated",
472 );
473
474 match result {
475 Ok(proof) => {
476 let _ = state_root_message_sender.send(MultiProofMessage::ProofCalculated(
477 Box::new(ProofCalculated {
478 sequence_number: proof_sequence_number,
479 update: SparseTrieUpdate {
480 state: hashed_state_update,
481 multiproof: DecodedMultiProof::from_storage_proof(
482 hashed_address,
483 proof,
484 ),
485 },
486 elapsed,
487 }),
488 ));
489 }
490 Err(error) => {
491 let _ = state_root_message_sender
492 .send(MultiProofMessage::ProofCalculationError(error.into()));
493 }
494 }
495 });
496
497 self.inflight += 1;
498 self.metrics.inflight_multiproofs_histogram.record(self.inflight as f64);
499 }
500
501 fn spawn_multiproof(&mut self, multiproof_input: MultiproofInput<Factory>) {
503 let MultiproofInput {
504 config,
505 source,
506 hashed_state_update,
507 proof_targets,
508 proof_sequence_number,
509 state_root_message_sender,
510 multi_added_removed_keys,
511 } = multiproof_input;
512 let storage_proof_task_handle = self.storage_proof_task_handle.clone();
513
514 self.executor.spawn_blocking(move || {
515 let account_targets = proof_targets.len();
516 let storage_targets = proof_targets.values().map(|slots| slots.len()).sum::<usize>();
517
518 trace!(
519 target: "engine::root",
520 proof_sequence_number,
521 ?proof_targets,
522 account_targets,
523 storage_targets,
524 ?source,
525 "Starting multiproof calculation",
526 );
527
528 let start = Instant::now();
529 let result = ParallelProof::new(
530 config.consistent_view,
531 config.nodes_sorted,
532 config.state_sorted,
533 config.prefix_sets,
534 storage_proof_task_handle.clone(),
535 )
536 .with_branch_node_masks(true)
537 .with_multi_added_removed_keys(multi_added_removed_keys)
538 .decoded_multiproof(proof_targets);
539 let elapsed = start.elapsed();
540 trace!(
541 target: "engine::root",
542 proof_sequence_number,
543 ?elapsed,
544 ?source,
545 account_targets,
546 storage_targets,
547 "Multiproof calculated",
548 );
549
550 match result {
551 Ok(proof) => {
552 let _ = state_root_message_sender.send(MultiProofMessage::ProofCalculated(
553 Box::new(ProofCalculated {
554 sequence_number: proof_sequence_number,
555 update: SparseTrieUpdate {
556 state: hashed_state_update,
557 multiproof: proof,
558 },
559 elapsed,
560 }),
561 ));
562 }
563 Err(error) => {
564 let _ = state_root_message_sender
565 .send(MultiProofMessage::ProofCalculationError(error.into()));
566 }
567 }
568 });
569
570 self.inflight += 1;
571 self.metrics.inflight_multiproofs_histogram.record(self.inflight as f64);
572 }
573}
574
575#[derive(Metrics, Clone)]
576#[metrics(scope = "tree.root")]
577pub(crate) struct MultiProofTaskMetrics {
578 pub inflight_multiproofs_histogram: Histogram,
580 pub pending_multiproofs_histogram: Histogram,
582
583 pub prefetch_proof_targets_accounts_histogram: Histogram,
585 pub prefetch_proof_targets_storages_histogram: Histogram,
587 pub prefetch_proof_chunks_histogram: Histogram,
589
590 pub state_update_proof_targets_accounts_histogram: Histogram,
592 pub state_update_proof_targets_storages_histogram: Histogram,
594 pub state_update_proof_chunks_histogram: Histogram,
596
597 pub proof_calculation_duration_histogram: Histogram,
599
600 pub sparse_trie_update_duration_histogram: Histogram,
602 pub sparse_trie_final_update_duration_histogram: Histogram,
604 pub sparse_trie_total_duration_histogram: Histogram,
606
607 pub state_updates_received_histogram: Histogram,
609 pub proofs_processed_histogram: Histogram,
611 pub multiproof_task_total_duration_histogram: Histogram,
613 pub first_update_wait_time_histogram: Histogram,
615 pub last_proof_wait_time_histogram: Histogram,
617}
618
619#[derive(Debug)]
629pub(super) struct MultiProofTask<Factory: DatabaseProviderFactory> {
630 config: MultiProofConfig<Factory>,
632 rx: Receiver<MultiProofMessage>,
634 tx: Sender<MultiProofMessage>,
636 to_sparse_trie: Sender<SparseTrieUpdate>,
638 fetched_proof_targets: MultiProofTargets,
640 multi_added_removed_keys: MultiAddedRemovedKeys,
642 proof_sequencer: ProofSequencer,
644 multiproof_manager: MultiproofManager<Factory>,
646 metrics: MultiProofTaskMetrics,
648}
649
650impl<Factory> MultiProofTask<Factory>
651where
652 Factory: DatabaseProviderFactory<Provider: BlockReader> + Clone + 'static,
653{
654 pub(super) fn new(
656 config: MultiProofConfig<Factory>,
657 executor: WorkloadExecutor,
658 proof_task_handle: ProofTaskManagerHandle<FactoryTx<Factory>>,
659 to_sparse_trie: Sender<SparseTrieUpdate>,
660 max_concurrency: usize,
661 ) -> Self {
662 let (tx, rx) = channel();
663 let metrics = MultiProofTaskMetrics::default();
664
665 Self {
666 config,
667 rx,
668 tx,
669 to_sparse_trie,
670 fetched_proof_targets: Default::default(),
671 multi_added_removed_keys: MultiAddedRemovedKeys::new(),
672 proof_sequencer: ProofSequencer::default(),
673 multiproof_manager: MultiproofManager::new(
674 executor,
675 metrics.clone(),
676 proof_task_handle,
677 max_concurrency,
678 ),
679 metrics,
680 }
681 }
682
683 pub(super) fn state_root_message_sender(&self) -> Sender<MultiProofMessage> {
685 self.tx.clone()
686 }
687
688 fn on_prefetch_proof(&mut self, targets: MultiProofTargets) -> u64 {
692 let proof_targets = self.get_prefetch_proof_targets(targets);
693 self.fetched_proof_targets.extend_ref(&proof_targets);
694
695 self.multi_added_removed_keys.touch_accounts(proof_targets.keys().copied());
699
700 let multi_added_removed_keys = Arc::new(self.multi_added_removed_keys.clone());
702
703 self.metrics.prefetch_proof_targets_accounts_histogram.record(proof_targets.len() as f64);
704 self.metrics
705 .prefetch_proof_targets_storages_histogram
706 .record(proof_targets.values().map(|slots| slots.len()).sum::<usize>() as f64);
707
708 let mut chunks = 0;
710 for proof_targets_chunk in proof_targets.chunks(MULTIPROOF_TARGETS_CHUNK_SIZE) {
711 self.multiproof_manager.spawn_or_queue(
712 MultiproofInput {
713 config: self.config.clone(),
714 source: None,
715 hashed_state_update: Default::default(),
716 proof_targets: proof_targets_chunk,
717 proof_sequence_number: self.proof_sequencer.next_sequence(),
718 state_root_message_sender: self.tx.clone(),
719 multi_added_removed_keys: Some(multi_added_removed_keys.clone()),
720 }
721 .into(),
722 );
723 chunks += 1;
724 }
725 self.metrics.prefetch_proof_chunks_histogram.record(chunks as f64);
726
727 chunks
728 }
729
730 fn is_done(
732 &self,
733 proofs_processed: u64,
734 state_update_proofs_requested: u64,
735 prefetch_proofs_requested: u64,
736 updates_finished: bool,
737 ) -> bool {
738 let all_proofs_processed =
739 proofs_processed >= state_update_proofs_requested + prefetch_proofs_requested;
740 let no_pending = !self.proof_sequencer.has_pending();
741 debug!(
742 target: "engine::root",
743 proofs_processed,
744 state_update_proofs_requested,
745 prefetch_proofs_requested,
746 no_pending,
747 updates_finished,
748 "Checking end condition"
749 );
750 all_proofs_processed && no_pending && updates_finished
751 }
752
753 fn get_prefetch_proof_targets(&self, mut targets: MultiProofTargets) -> MultiProofTargets {
755 let mut duplicates = 0;
759
760 targets.retain(|hashed_address, target_storage| {
762 let keep = self
763 .fetched_proof_targets
764 .get(hashed_address)
765 .is_none_or(|fetched_storage| {
767 !target_storage.is_subset(fetched_storage)
769 });
770
771 if !keep {
772 duplicates += target_storage.len();
773 }
774
775 keep
776 });
777
778 for (hashed_address, target_storage) in targets.deref_mut() {
780 let Some(fetched_storage) = self.fetched_proof_targets.get(hashed_address) else {
781 continue
784 };
785
786 let prev_target_storage_len = target_storage.len();
787
788 target_storage.retain(|slot| !fetched_storage.contains(slot));
792
793 duplicates += prev_target_storage_len - target_storage.len();
794 }
795
796 if duplicates > 0 {
797 trace!(target: "engine::root", duplicates, "Removed duplicate prefetch proof targets");
798 }
799
800 targets
801 }
802
803 fn on_state_update(&mut self, source: StateChangeSource, update: EvmState) -> u64 {
807 let hashed_state_update = evm_state_to_hashed_post_state(update);
808
809 self.multi_added_removed_keys.update_with_state(&hashed_state_update);
811
812 let (fetched_state_update, not_fetched_state_update) = hashed_state_update
815 .partition_by_targets(&self.fetched_proof_targets, &self.multi_added_removed_keys);
816
817 let mut state_updates = 0;
818 if !fetched_state_update.is_empty() {
821 let _ = self.tx.send(MultiProofMessage::EmptyProof {
822 sequence_number: self.proof_sequencer.next_sequence(),
823 state: fetched_state_update,
824 });
825 state_updates += 1;
826 }
827
828 let multi_added_removed_keys = Arc::new(self.multi_added_removed_keys.clone());
830
831 let mut chunks = 0;
833 let mut spawned_proof_targets = MultiProofTargets::default();
834 for chunk in not_fetched_state_update.chunks(MULTIPROOF_TARGETS_CHUNK_SIZE) {
835 let proof_targets =
836 get_proof_targets(&chunk, &self.fetched_proof_targets, &multi_added_removed_keys);
837 spawned_proof_targets.extend_ref(&proof_targets);
838
839 self.multiproof_manager.spawn_or_queue(
840 MultiproofInput {
841 config: self.config.clone(),
842 source: Some(source),
843 hashed_state_update: chunk,
844 proof_targets,
845 proof_sequence_number: self.proof_sequencer.next_sequence(),
846 state_root_message_sender: self.tx.clone(),
847 multi_added_removed_keys: Some(multi_added_removed_keys.clone()),
848 }
849 .into(),
850 );
851 chunks += 1;
852 }
853
854 self.metrics
855 .state_update_proof_targets_accounts_histogram
856 .record(spawned_proof_targets.len() as f64);
857 self.metrics
858 .state_update_proof_targets_storages_histogram
859 .record(spawned_proof_targets.values().map(|slots| slots.len()).sum::<usize>() as f64);
860 self.metrics.state_update_proof_chunks_histogram.record(chunks as f64);
861
862 self.fetched_proof_targets.extend(spawned_proof_targets);
863
864 state_updates + chunks
865 }
866
867 fn on_proof(
869 &mut self,
870 sequence_number: u64,
871 update: SparseTrieUpdate,
872 ) -> Option<SparseTrieUpdate> {
873 let ready_proofs = self.proof_sequencer.add_proof(sequence_number, update);
874
875 ready_proofs
876 .into_iter()
877 .reduce(|mut acc_update, update| {
879 acc_update.extend(update);
880 acc_update
881 })
882 .filter(|proof| !proof.is_empty())
884 }
885
886 pub(crate) fn run(mut self) {
921 let mut prefetch_proofs_requested = 0;
923 let mut state_update_proofs_requested = 0;
924 let mut proofs_processed = 0;
925
926 let mut updates_finished = false;
927
928 let start = Instant::now();
930
931 let mut first_update_time = None;
933 let mut updates_finished_time = None;
935
936 loop {
937 trace!(target: "engine::root", "entering main channel receiving loop");
938 match self.rx.recv() {
939 Ok(message) => match message {
940 MultiProofMessage::PrefetchProofs(targets) => {
941 trace!(target: "engine::root", "processing MultiProofMessage::PrefetchProofs");
942 if first_update_time.is_none() {
943 self.metrics
945 .first_update_wait_time_histogram
946 .record(start.elapsed().as_secs_f64());
947 first_update_time = Some(Instant::now());
948 debug!(target: "engine::root", "Started state root calculation");
949 }
950
951 let account_targets = targets.len();
952 let storage_targets =
953 targets.values().map(|slots| slots.len()).sum::<usize>();
954 prefetch_proofs_requested += self.on_prefetch_proof(targets);
955 debug!(
956 target: "engine::root",
957 account_targets,
958 storage_targets,
959 prefetch_proofs_requested,
960 "Prefetching proofs"
961 );
962 }
963 MultiProofMessage::StateUpdate(source, update) => {
964 trace!(target: "engine::root", "processing MultiProofMessage::StateUpdate");
965 if first_update_time.is_none() {
966 self.metrics
968 .first_update_wait_time_histogram
969 .record(start.elapsed().as_secs_f64());
970 first_update_time = Some(Instant::now());
971 debug!(target: "engine::root", "Started state root calculation");
972 }
973
974 let len = update.len();
975 state_update_proofs_requested += self.on_state_update(source, update);
976 debug!(
977 target: "engine::root",
978 ?source,
979 len,
980 ?state_update_proofs_requested,
981 "Received new state update"
982 );
983 }
984 MultiProofMessage::FinishedStateUpdates => {
985 trace!(target: "engine::root", "processing MultiProofMessage::FinishedStateUpdates");
986 updates_finished = true;
987 updates_finished_time = Some(Instant::now());
988 if self.is_done(
989 proofs_processed,
990 state_update_proofs_requested,
991 prefetch_proofs_requested,
992 updates_finished,
993 ) {
994 debug!(
995 target: "engine::root",
996 "State updates finished and all proofs processed, ending calculation"
997 );
998 break
999 }
1000 }
1001 MultiProofMessage::EmptyProof { sequence_number, state } => {
1002 trace!(target: "engine::root", "processing MultiProofMessage::EmptyProof");
1003
1004 proofs_processed += 1;
1005
1006 if let Some(combined_update) = self.on_proof(
1007 sequence_number,
1008 SparseTrieUpdate { state, multiproof: Default::default() },
1009 ) {
1010 let _ = self.to_sparse_trie.send(combined_update);
1011 }
1012
1013 if self.is_done(
1014 proofs_processed,
1015 state_update_proofs_requested,
1016 prefetch_proofs_requested,
1017 updates_finished,
1018 ) {
1019 debug!(
1020 target: "engine::root",
1021 "State updates finished and all proofs processed, ending calculation"
1022 );
1023 break
1024 }
1025 }
1026 MultiProofMessage::ProofCalculated(proof_calculated) => {
1027 trace!(target: "engine::root", "processing
1028 MultiProofMessage::ProofCalculated");
1029
1030 proofs_processed += 1;
1033
1034 self.metrics
1035 .proof_calculation_duration_histogram
1036 .record(proof_calculated.elapsed);
1037
1038 debug!(
1039 target: "engine::root",
1040 sequence = proof_calculated.sequence_number,
1041 total_proofs = proofs_processed,
1042 "Processing calculated proof"
1043 );
1044
1045 self.multiproof_manager.on_calculation_complete();
1046
1047 if let Some(combined_update) =
1048 self.on_proof(proof_calculated.sequence_number, proof_calculated.update)
1049 {
1050 let _ = self.to_sparse_trie.send(combined_update);
1051 }
1052
1053 if self.is_done(
1054 proofs_processed,
1055 state_update_proofs_requested,
1056 prefetch_proofs_requested,
1057 updates_finished,
1058 ) {
1059 debug!(
1060 target: "engine::root",
1061 "State updates finished and all proofs processed, ending calculation");
1062 break
1063 }
1064 }
1065 MultiProofMessage::ProofCalculationError(err) => {
1066 error!(
1067 target: "engine::root",
1068 ?err,
1069 "proof calculation error"
1070 );
1071 return
1072 }
1073 },
1074 Err(_) => {
1075 error!(target: "engine::root", "Internal message channel closed unexpectedly");
1078 return
1079 }
1080 }
1081 }
1082
1083 debug!(
1084 target: "engine::root",
1085 total_updates = state_update_proofs_requested,
1086 total_proofs = proofs_processed,
1087 total_time = ?first_update_time.map(|t|t.elapsed()),
1088 time_since_updates_finished = ?updates_finished_time.map(|t|t.elapsed()),
1089 "All proofs processed, ending calculation"
1090 );
1091
1092 self.metrics.state_updates_received_histogram.record(state_update_proofs_requested as f64);
1094 self.metrics.proofs_processed_histogram.record(proofs_processed as f64);
1095 if let Some(total_time) = first_update_time.map(|t| t.elapsed()) {
1096 self.metrics.multiproof_task_total_duration_histogram.record(total_time);
1097 }
1098
1099 if let Some(updates_finished_time) = updates_finished_time {
1100 self.metrics
1101 .last_proof_wait_time_histogram
1102 .record(updates_finished_time.elapsed().as_secs_f64());
1103 }
1104 }
1105}
1106
1107fn get_proof_targets(
1111 state_update: &HashedPostState,
1112 fetched_proof_targets: &MultiProofTargets,
1113 multi_added_removed_keys: &MultiAddedRemovedKeys,
1114) -> MultiProofTargets {
1115 let mut targets = MultiProofTargets::default();
1116
1117 for &hashed_address in state_update.accounts.keys() {
1119 if !fetched_proof_targets.contains_key(&hashed_address) {
1120 targets.insert(hashed_address, HashSet::default());
1121 }
1122 }
1123
1124 for (hashed_address, storage) in &state_update.storages {
1126 let fetched = fetched_proof_targets.get(hashed_address);
1127 let storage_added_removed_keys = multi_added_removed_keys.get_storage(hashed_address);
1128 let mut changed_slots = storage
1129 .storage
1130 .keys()
1131 .filter(|slot| {
1132 !fetched.is_some_and(|f| f.contains(*slot)) ||
1133 storage_added_removed_keys.is_some_and(|k| k.is_removed(slot))
1134 })
1135 .peekable();
1136
1137 if storage.wiped && fetched.is_none() {
1139 targets.entry(*hashed_address).or_default();
1140 }
1141
1142 if changed_slots.peek().is_some() {
1143 targets.entry(*hashed_address).or_default().extend(changed_slots);
1144 }
1145 }
1146
1147 targets
1148}
1149
1150#[cfg(test)]
1151mod tests {
1152 use super::*;
1153 use alloy_primitives::map::B256Set;
1154 use reth_provider::{providers::ConsistentDbView, test_utils::create_test_provider_factory};
1155 use reth_trie::{MultiProof, TrieInput};
1156 use reth_trie_parallel::proof_task::{ProofTaskCtx, ProofTaskManager};
1157 use revm_primitives::{B256, U256};
1158 use std::sync::Arc;
1159
1160 fn create_state_root_config<F>(factory: F, input: TrieInput) -> MultiProofConfig<F>
1161 where
1162 F: DatabaseProviderFactory<Provider: BlockReader> + Clone + 'static,
1163 {
1164 let consistent_view = ConsistentDbView::new(factory, None);
1165 let nodes_sorted = Arc::new(input.nodes.clone().into_sorted());
1166 let state_sorted = Arc::new(input.state.clone().into_sorted());
1167 let prefix_sets = Arc::new(input.prefix_sets);
1168
1169 MultiProofConfig { consistent_view, nodes_sorted, state_sorted, prefix_sets }
1170 }
1171
1172 fn create_test_state_root_task<F>(factory: F) -> MultiProofTask<F>
1173 where
1174 F: DatabaseProviderFactory<Provider: BlockReader> + Clone + 'static,
1175 {
1176 let executor = WorkloadExecutor::default();
1177 let config = create_state_root_config(factory, TrieInput::default());
1178 let task_ctx = ProofTaskCtx::new(
1179 config.nodes_sorted.clone(),
1180 config.state_sorted.clone(),
1181 config.prefix_sets.clone(),
1182 );
1183 let proof_task = ProofTaskManager::new(
1184 executor.handle().clone(),
1185 config.consistent_view.clone(),
1186 task_ctx,
1187 1,
1188 );
1189 let channel = channel();
1190
1191 MultiProofTask::new(config, executor, proof_task.handle(), channel.0, 1)
1192 }
1193
1194 #[test]
1195 fn test_add_proof_in_sequence() {
1196 let mut sequencer = ProofSequencer::default();
1197 let proof1 = MultiProof::default();
1198 let proof2 = MultiProof::default();
1199 sequencer.next_sequence = 2;
1200
1201 let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1).unwrap());
1202 assert_eq!(ready.len(), 1);
1203 assert!(!sequencer.has_pending());
1204
1205 let ready = sequencer.add_proof(1, SparseTrieUpdate::from_multiproof(proof2).unwrap());
1206 assert_eq!(ready.len(), 1);
1207 assert!(!sequencer.has_pending());
1208 }
1209
1210 #[test]
1211 fn test_add_proof_out_of_order() {
1212 let mut sequencer = ProofSequencer::default();
1213 let proof1 = MultiProof::default();
1214 let proof2 = MultiProof::default();
1215 let proof3 = MultiProof::default();
1216 sequencer.next_sequence = 3;
1217
1218 let ready = sequencer.add_proof(2, SparseTrieUpdate::from_multiproof(proof3).unwrap());
1219 assert_eq!(ready.len(), 0);
1220 assert!(sequencer.has_pending());
1221
1222 let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1).unwrap());
1223 assert_eq!(ready.len(), 1);
1224 assert!(sequencer.has_pending());
1225
1226 let ready = sequencer.add_proof(1, SparseTrieUpdate::from_multiproof(proof2).unwrap());
1227 assert_eq!(ready.len(), 2);
1228 assert!(!sequencer.has_pending());
1229 }
1230
1231 #[test]
1232 fn test_add_proof_with_gaps() {
1233 let mut sequencer = ProofSequencer::default();
1234 let proof1 = MultiProof::default();
1235 let proof3 = MultiProof::default();
1236 sequencer.next_sequence = 3;
1237
1238 let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1).unwrap());
1239 assert_eq!(ready.len(), 1);
1240
1241 let ready = sequencer.add_proof(2, SparseTrieUpdate::from_multiproof(proof3).unwrap());
1242 assert_eq!(ready.len(), 0);
1243 assert!(sequencer.has_pending());
1244 }
1245
1246 #[test]
1247 fn test_add_proof_duplicate_sequence() {
1248 let mut sequencer = ProofSequencer::default();
1249 let proof1 = MultiProof::default();
1250 let proof2 = MultiProof::default();
1251
1252 let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1).unwrap());
1253 assert_eq!(ready.len(), 1);
1254
1255 let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof2).unwrap());
1256 assert_eq!(ready.len(), 0);
1257 assert!(!sequencer.has_pending());
1258 }
1259
1260 #[test]
1261 fn test_add_proof_batch_processing() {
1262 let mut sequencer = ProofSequencer::default();
1263 let proofs: Vec<_> = (0..5).map(|_| MultiProof::default()).collect();
1264 sequencer.next_sequence = 5;
1265
1266 sequencer.add_proof(4, SparseTrieUpdate::from_multiproof(proofs[4].clone()).unwrap());
1267 sequencer.add_proof(2, SparseTrieUpdate::from_multiproof(proofs[2].clone()).unwrap());
1268 sequencer.add_proof(1, SparseTrieUpdate::from_multiproof(proofs[1].clone()).unwrap());
1269 sequencer.add_proof(3, SparseTrieUpdate::from_multiproof(proofs[3].clone()).unwrap());
1270
1271 let ready =
1272 sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proofs[0].clone()).unwrap());
1273 assert_eq!(ready.len(), 5);
1274 assert!(!sequencer.has_pending());
1275 }
1276
1277 fn create_get_proof_targets_state() -> HashedPostState {
1278 let mut state = HashedPostState::default();
1279
1280 let addr1 = B256::random();
1281 let addr2 = B256::random();
1282 state.accounts.insert(addr1, Some(Default::default()));
1283 state.accounts.insert(addr2, Some(Default::default()));
1284
1285 let mut storage = HashedStorage::default();
1286 let slot1 = B256::random();
1287 let slot2 = B256::random();
1288 storage.storage.insert(slot1, U256::ZERO);
1289 storage.storage.insert(slot2, U256::from(1));
1290 state.storages.insert(addr1, storage);
1291
1292 state
1293 }
1294
1295 #[test]
1296 fn test_get_proof_targets_new_account_targets() {
1297 let state = create_get_proof_targets_state();
1298 let fetched = MultiProofTargets::default();
1299
1300 let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
1301
1302 assert_eq!(targets.len(), state.accounts.len());
1304 for addr in state.accounts.keys() {
1305 assert!(targets.contains_key(addr));
1306 }
1307 }
1308
1309 #[test]
1310 fn test_get_proof_targets_new_storage_targets() {
1311 let state = create_get_proof_targets_state();
1312 let fetched = MultiProofTargets::default();
1313
1314 let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
1315
1316 for (addr, storage) in &state.storages {
1318 assert!(targets.contains_key(addr));
1319 let target_slots = &targets[addr];
1320 assert_eq!(target_slots.len(), storage.storage.len());
1321 for slot in storage.storage.keys() {
1322 assert!(target_slots.contains(slot));
1323 }
1324 }
1325 }
1326
1327 #[test]
1328 fn test_get_proof_targets_filter_already_fetched_accounts() {
1329 let state = create_get_proof_targets_state();
1330 let mut fetched = MultiProofTargets::default();
1331
1332 let fetched_addr = state
1334 .accounts
1335 .keys()
1336 .find(|&&addr| !state.storages.contains_key(&addr))
1337 .expect("Should have an account without storage");
1338
1339 fetched.insert(*fetched_addr, HashSet::default());
1341
1342 let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
1343
1344 assert!(!targets.contains_key(fetched_addr));
1346 assert_eq!(targets.len(), state.accounts.len() - 1);
1348 }
1349
1350 #[test]
1351 fn test_get_proof_targets_filter_already_fetched_storage() {
1352 let state = create_get_proof_targets_state();
1353 let mut fetched = MultiProofTargets::default();
1354
1355 let (addr, storage) = state.storages.iter().next().unwrap();
1357 let mut fetched_slots = HashSet::default();
1358 let fetched_slot = *storage.storage.keys().next().unwrap();
1359 fetched_slots.insert(fetched_slot);
1360 fetched.insert(*addr, fetched_slots);
1361
1362 let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
1363
1364 let target_slots = &targets[addr];
1366 assert!(!target_slots.contains(&fetched_slot));
1367 assert_eq!(target_slots.len(), storage.storage.len() - 1);
1368 }
1369
1370 #[test]
1371 fn test_get_proof_targets_empty_state() {
1372 let state = HashedPostState::default();
1373 let fetched = MultiProofTargets::default();
1374
1375 let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
1376
1377 assert!(targets.is_empty());
1378 }
1379
1380 #[test]
1381 fn test_get_proof_targets_mixed_fetched_state() {
1382 let mut state = HashedPostState::default();
1383 let mut fetched = MultiProofTargets::default();
1384
1385 let addr1 = B256::random();
1386 let addr2 = B256::random();
1387 let slot1 = B256::random();
1388 let slot2 = B256::random();
1389
1390 state.accounts.insert(addr1, Some(Default::default()));
1391 state.accounts.insert(addr2, Some(Default::default()));
1392
1393 let mut storage = HashedStorage::default();
1394 storage.storage.insert(slot1, U256::ZERO);
1395 storage.storage.insert(slot2, U256::from(1));
1396 state.storages.insert(addr1, storage);
1397
1398 let mut fetched_slots = HashSet::default();
1399 fetched_slots.insert(slot1);
1400 fetched.insert(addr1, fetched_slots);
1401
1402 let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
1403
1404 assert!(targets.contains_key(&addr2));
1405 assert!(!targets[&addr1].contains(&slot1));
1406 assert!(targets[&addr1].contains(&slot2));
1407 }
1408
1409 #[test]
1410 fn test_get_proof_targets_unmodified_account_with_storage() {
1411 let mut state = HashedPostState::default();
1412 let fetched = MultiProofTargets::default();
1413
1414 let addr = B256::random();
1415 let slot1 = B256::random();
1416 let slot2 = B256::random();
1417
1418 let mut storage = HashedStorage::default();
1421 storage.storage.insert(slot1, U256::from(1));
1422 storage.storage.insert(slot2, U256::from(2));
1423 state.storages.insert(addr, storage);
1424
1425 assert!(!state.accounts.contains_key(&addr));
1426 assert!(!fetched.contains_key(&addr));
1427
1428 let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
1429
1430 assert!(targets.contains_key(&addr));
1432
1433 let target_slots = &targets[&addr];
1434 assert_eq!(target_slots.len(), 2);
1435 assert!(target_slots.contains(&slot1));
1436 assert!(target_slots.contains(&slot2));
1437 }
1438
1439 #[test]
1440 fn test_get_prefetch_proof_targets_no_duplicates() {
1441 let test_provider_factory = create_test_provider_factory();
1442 let mut test_state_root_task = create_test_state_root_task(test_provider_factory);
1443
1444 let mut targets = MultiProofTargets::default();
1446 let addr1 = B256::random();
1447 let addr2 = B256::random();
1448 let slot1 = B256::random();
1449 let slot2 = B256::random();
1450 targets.insert(addr1, std::iter::once(slot1).collect());
1451 targets.insert(addr2, std::iter::once(slot2).collect());
1452
1453 let prefetch_proof_targets =
1454 test_state_root_task.get_prefetch_proof_targets(targets.clone());
1455
1456 assert_eq!(prefetch_proof_targets, targets);
1459
1460 let addr3 = B256::random();
1462 let slot3 = B256::random();
1463 test_state_root_task.fetched_proof_targets.insert(addr3, std::iter::once(slot3).collect());
1464
1465 let prefetch_proof_targets =
1466 test_state_root_task.get_prefetch_proof_targets(targets.clone());
1467
1468 assert_eq!(prefetch_proof_targets, targets);
1471 }
1472
1473 #[test]
1474 fn test_get_prefetch_proof_targets_remove_subset() {
1475 let test_provider_factory = create_test_provider_factory();
1476 let mut test_state_root_task = create_test_state_root_task(test_provider_factory);
1477
1478 let mut targets = MultiProofTargets::default();
1480 let addr1 = B256::random();
1481 let addr2 = B256::random();
1482 let slot1 = B256::random();
1483 let slot2 = B256::random();
1484 targets.insert(addr1, std::iter::once(slot1).collect());
1485 targets.insert(addr2, std::iter::once(slot2).collect());
1486
1487 test_state_root_task.fetched_proof_targets.insert(addr1, std::iter::once(slot1).collect());
1489
1490 let prefetch_proof_targets =
1491 test_state_root_task.get_prefetch_proof_targets(targets.clone());
1492
1493 assert_eq!(prefetch_proof_targets.len(), 1);
1495 assert!(!prefetch_proof_targets.contains_key(&addr1));
1496 assert!(prefetch_proof_targets.contains_key(&addr2));
1497
1498 let slot3 = B256::random();
1500 targets.get_mut(&addr1).unwrap().insert(slot3);
1501
1502 let prefetch_proof_targets =
1503 test_state_root_task.get_prefetch_proof_targets(targets.clone());
1504
1505 assert_eq!(prefetch_proof_targets.len(), 2);
1508 assert!(prefetch_proof_targets.contains_key(&addr1));
1509 assert_eq!(
1510 *prefetch_proof_targets.get(&addr1).unwrap(),
1511 std::iter::once(slot3).collect::<B256Set>()
1512 );
1513 assert!(prefetch_proof_targets.contains_key(&addr2));
1514 assert_eq!(
1515 *prefetch_proof_targets.get(&addr2).unwrap(),
1516 std::iter::once(slot2).collect::<B256Set>()
1517 );
1518 }
1519
1520 #[test]
1521 fn test_get_proof_targets_with_removed_storage_keys() {
1522 let mut state = HashedPostState::default();
1523 let mut fetched = MultiProofTargets::default();
1524 let mut multi_added_removed_keys = MultiAddedRemovedKeys::new();
1525
1526 let addr = B256::random();
1527 let slot1 = B256::random();
1528 let slot2 = B256::random();
1529
1530 state.accounts.insert(addr, Some(Default::default()));
1532
1533 let mut storage = HashedStorage::default();
1535 storage.storage.insert(slot1, U256::from(100));
1536 storage.storage.insert(slot2, U256::from(200));
1537 state.storages.insert(addr, storage);
1538
1539 let mut fetched_slots = HashSet::default();
1541 fetched_slots.insert(slot1);
1542 fetched.insert(addr, fetched_slots);
1543
1544 let mut removed_state = HashedPostState::default();
1546 let mut removed_storage = HashedStorage::default();
1547 removed_storage.storage.insert(slot1, U256::ZERO); removed_state.storages.insert(addr, removed_storage);
1549 multi_added_removed_keys.update_with_state(&removed_state);
1550
1551 let targets = get_proof_targets(&state, &fetched, &multi_added_removed_keys);
1552
1553 assert!(targets.contains_key(&addr));
1555 let target_slots = &targets[&addr];
1556 assert_eq!(target_slots.len(), 2);
1557 assert!(target_slots.contains(&slot1)); assert!(target_slots.contains(&slot2)); }
1560
1561 #[test]
1562 fn test_get_proof_targets_with_wiped_storage() {
1563 let mut state = HashedPostState::default();
1564 let fetched = MultiProofTargets::default();
1565 let multi_added_removed_keys = MultiAddedRemovedKeys::new();
1566
1567 let addr = B256::random();
1568 let slot1 = B256::random();
1569
1570 state.accounts.insert(addr, Some(Default::default()));
1572
1573 let mut storage = HashedStorage::new(true);
1575 storage.storage.insert(slot1, U256::from(100));
1576 state.storages.insert(addr, storage);
1577
1578 let targets = get_proof_targets(&state, &fetched, &multi_added_removed_keys);
1579
1580 assert!(targets.contains_key(&addr));
1582 let target_slots = &targets[&addr];
1583 assert_eq!(target_slots.len(), 1);
1584 assert!(target_slots.contains(&slot1));
1585 }
1586
1587 #[test]
1588 fn test_get_proof_targets_removed_keys_not_in_state_update() {
1589 let mut state = HashedPostState::default();
1590 let mut fetched = MultiProofTargets::default();
1591 let mut multi_added_removed_keys = MultiAddedRemovedKeys::new();
1592
1593 let addr = B256::random();
1594 let slot1 = B256::random();
1595 let slot2 = B256::random();
1596 let slot3 = B256::random();
1597
1598 state.accounts.insert(addr, Some(Default::default()));
1600
1601 let mut storage = HashedStorage::default();
1603 storage.storage.insert(slot1, U256::from(100));
1604 storage.storage.insert(slot2, U256::from(200));
1605 state.storages.insert(addr, storage);
1606
1607 let mut fetched_slots = HashSet::default();
1609 fetched_slots.insert(slot1);
1610 fetched_slots.insert(slot2);
1611 fetched_slots.insert(slot3); fetched.insert(addr, fetched_slots);
1613
1614 let mut removed_state = HashedPostState::default();
1616 let mut removed_storage = HashedStorage::default();
1617 removed_storage.storage.insert(slot3, U256::ZERO);
1618 removed_state.storages.insert(addr, removed_storage);
1619 multi_added_removed_keys.update_with_state(&removed_state);
1620
1621 let targets = get_proof_targets(&state, &fetched, &multi_added_removed_keys);
1622
1623 assert!(!targets.contains_key(&addr));
1625 }
1626}