1use alloy_evm::block::StateChangeSource;
4use alloy_primitives::{
5    keccak256,
6    map::{B256Set, HashSet},
7    B256,
8};
9use crossbeam_channel::{unbounded, Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
10use dashmap::DashMap;
11use derive_more::derive::Deref;
12use metrics::{Gauge, Histogram};
13use reth_metrics::Metrics;
14use reth_revm::state::EvmState;
15use reth_trie::{
16    added_removed_keys::MultiAddedRemovedKeys, prefix_set::TriePrefixSetsMut,
17    updates::TrieUpdatesSorted, DecodedMultiProof, HashedPostState, HashedPostStateSorted,
18    HashedStorage, MultiProofTargets, TrieInput,
19};
20use reth_trie_parallel::{
21    proof::ParallelProof,
22    proof_task::{
23        AccountMultiproofInput, ProofResultContext, ProofResultMessage, ProofWorkerHandle,
24        StorageProofInput,
25    },
26};
27use std::{collections::BTreeMap, ops::DerefMut, sync::Arc, time::Instant};
28use tracing::{debug, error, instrument, trace};
29
30#[derive(Default, Debug)]
33pub struct SparseTrieUpdate {
34    pub(crate) state: HashedPostState,
36    pub(crate) multiproof: DecodedMultiProof,
38}
39
40impl SparseTrieUpdate {
41    pub(super) fn is_empty(&self) -> bool {
43        self.state.is_empty() && self.multiproof.is_empty()
44    }
45
46    #[cfg(test)]
48    pub(super) fn from_multiproof(multiproof: reth_trie::MultiProof) -> alloy_rlp::Result<Self> {
49        Ok(Self { multiproof: multiproof.try_into()?, ..Default::default() })
50    }
51
52    pub(super) fn extend(&mut self, other: Self) {
54        self.state.extend(other.state);
55        self.multiproof.extend(other.multiproof);
56    }
57}
58
59#[derive(Debug, Clone, Default)]
61pub(crate) struct MultiProofConfig {
62    pub nodes_sorted: Arc<TrieUpdatesSorted>,
65    pub state_sorted: Arc<HashedPostStateSorted>,
67    pub prefix_sets: Arc<TriePrefixSetsMut>,
71}
72
73impl MultiProofConfig {
74    pub(crate) fn from_input(mut input: TrieInput) -> (TrieInput, Self) {
79        let config = Self {
80            nodes_sorted: Arc::new(input.nodes.drain_into_sorted()),
81            state_sorted: Arc::new(input.state.drain_into_sorted()),
82            prefix_sets: Arc::new(input.prefix_sets.clone()),
83        };
84        (input.cleared(), config)
85    }
86}
87
88#[derive(Debug)]
90pub(super) enum MultiProofMessage {
91    PrefetchProofs(MultiProofTargets),
93    StateUpdate(StateChangeSource, EvmState),
95    EmptyProof {
100        sequence_number: u64,
102        state: HashedPostState,
104    },
105    FinishedStateUpdates,
110}
111
112#[derive(Debug, Default)]
114struct ProofSequencer {
115    next_sequence: u64,
117    next_to_deliver: u64,
119    pending_proofs: BTreeMap<u64, SparseTrieUpdate>,
121}
122
123impl ProofSequencer {
124    const fn next_sequence(&mut self) -> u64 {
126        let seq = self.next_sequence;
127        self.next_sequence += 1;
128        seq
129    }
130
131    fn add_proof(&mut self, sequence: u64, update: SparseTrieUpdate) -> Vec<SparseTrieUpdate> {
134        if sequence >= self.next_to_deliver {
135            self.pending_proofs.insert(sequence, update);
136        }
137
138        if !self.pending_proofs.contains_key(&self.next_to_deliver) {
140            return Vec::new()
141        }
142
143        let mut consecutive_proofs = Vec::with_capacity(self.pending_proofs.len());
144        let mut current_sequence = self.next_to_deliver;
145
146        while let Some(pending) = self.pending_proofs.remove(¤t_sequence) {
148            consecutive_proofs.push(pending);
149            current_sequence += 1;
150
151            if !self.pending_proofs.contains_key(¤t_sequence) {
153                break;
154            }
155        }
156
157        self.next_to_deliver += consecutive_proofs.len() as u64;
158
159        consecutive_proofs
160    }
161
162    pub(crate) fn has_pending(&self) -> bool {
164        !self.pending_proofs.is_empty()
165    }
166}
167
168#[derive(Deref, Debug)]
174pub(super) struct StateHookSender(CrossbeamSender<MultiProofMessage>);
175
176impl StateHookSender {
177    pub(crate) const fn new(inner: CrossbeamSender<MultiProofMessage>) -> Self {
178        Self(inner)
179    }
180}
181
182impl Drop for StateHookSender {
183    fn drop(&mut self) {
184        let _ = self.0.send(MultiProofMessage::FinishedStateUpdates);
186    }
187}
188
189pub(crate) fn evm_state_to_hashed_post_state(update: EvmState) -> HashedPostState {
190    let mut hashed_state = HashedPostState::with_capacity(update.len());
191
192    for (address, account) in update {
193        if account.is_touched() {
194            let hashed_address = keccak256(address);
195            trace!(target: "engine::tree::payload_processor::multiproof", ?address, ?hashed_address, "Adding account to state update");
196
197            let destroyed = account.is_selfdestructed();
198            let info = if destroyed { None } else { Some(account.info.into()) };
199            hashed_state.accounts.insert(hashed_address, info);
200
201            let mut changed_storage_iter = account
202                .storage
203                .into_iter()
204                .filter(|(_slot, value)| value.is_changed())
205                .map(|(slot, value)| (keccak256(B256::from(slot)), value.present_value))
206                .peekable();
207
208            if destroyed {
209                hashed_state.storages.insert(hashed_address, HashedStorage::new(true));
210            } else if changed_storage_iter.peek().is_some() {
211                hashed_state
212                    .storages
213                    .insert(hashed_address, HashedStorage::from_iter(false, changed_storage_iter));
214            }
215        }
216    }
217
218    hashed_state
219}
220
221#[derive(Debug)]
223enum PendingMultiproofTask {
224    Storage(StorageMultiproofInput),
226    Regular(MultiproofInput),
228}
229
230impl PendingMultiproofTask {
231    const fn proof_sequence_number(&self) -> u64 {
233        match self {
234            Self::Storage(input) => input.proof_sequence_number,
235            Self::Regular(input) => input.proof_sequence_number,
236        }
237    }
238
239    fn proof_targets_is_empty(&self) -> bool {
241        match self {
242            Self::Storage(input) => input.proof_targets.is_empty(),
243            Self::Regular(input) => input.proof_targets.is_empty(),
244        }
245    }
246
247    fn send_empty_proof(self) {
249        match self {
250            Self::Storage(input) => input.send_empty_proof(),
251            Self::Regular(input) => input.send_empty_proof(),
252        }
253    }
254}
255
256impl From<StorageMultiproofInput> for PendingMultiproofTask {
257    fn from(input: StorageMultiproofInput) -> Self {
258        Self::Storage(input)
259    }
260}
261
262impl From<MultiproofInput> for PendingMultiproofTask {
263    fn from(input: MultiproofInput) -> Self {
264        Self::Regular(input)
265    }
266}
267
268#[derive(Debug)]
270struct StorageMultiproofInput {
271    hashed_state_update: HashedPostState,
272    hashed_address: B256,
273    proof_targets: B256Set,
274    proof_sequence_number: u64,
275    state_root_message_sender: CrossbeamSender<MultiProofMessage>,
276    multi_added_removed_keys: Arc<MultiAddedRemovedKeys>,
277}
278
279impl StorageMultiproofInput {
280    fn send_empty_proof(self) {
282        let _ = self.state_root_message_sender.send(MultiProofMessage::EmptyProof {
283            sequence_number: self.proof_sequence_number,
284            state: self.hashed_state_update,
285        });
286    }
287}
288
289#[derive(Debug)]
291struct MultiproofInput {
292    source: Option<StateChangeSource>,
293    hashed_state_update: HashedPostState,
294    proof_targets: MultiProofTargets,
295    proof_sequence_number: u64,
296    state_root_message_sender: CrossbeamSender<MultiProofMessage>,
297    multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
298}
299
300impl MultiproofInput {
301    fn send_empty_proof(self) {
303        let _ = self.state_root_message_sender.send(MultiProofMessage::EmptyProof {
304            sequence_number: self.proof_sequence_number,
305            state: self.hashed_state_update,
306        });
307    }
308}
309
310#[derive(Debug)]
321pub struct MultiproofManager {
322    proof_worker_handle: ProofWorkerHandle,
324    missed_leaves_storage_roots: Arc<DashMap<B256, B256>>,
336    proof_result_tx: CrossbeamSender<ProofResultMessage>,
339    metrics: MultiProofTaskMetrics,
341}
342
343impl MultiproofManager {
344    fn new(
346        metrics: MultiProofTaskMetrics,
347        proof_worker_handle: ProofWorkerHandle,
348        proof_result_tx: CrossbeamSender<ProofResultMessage>,
349    ) -> Self {
350        metrics.max_storage_workers.set(proof_worker_handle.total_storage_workers() as f64);
352        metrics.max_account_workers.set(proof_worker_handle.total_account_workers() as f64);
353
354        Self {
355            metrics,
356            proof_worker_handle,
357            missed_leaves_storage_roots: Default::default(),
358            proof_result_tx,
359        }
360    }
361
362    fn dispatch(&self, input: PendingMultiproofTask) {
364        if input.proof_targets_is_empty() {
366            debug!(
367                sequence_number = input.proof_sequence_number(),
368                "No proof targets, sending empty multiproof back immediately"
369            );
370            input.send_empty_proof();
371            return
372        }
373
374        match input {
375            PendingMultiproofTask::Storage(storage_input) => {
376                self.dispatch_storage_proof(storage_input);
377            }
378            PendingMultiproofTask::Regular(multiproof_input) => {
379                self.dispatch_multiproof(multiproof_input);
380            }
381        }
382    }
383
384    fn dispatch_storage_proof(&self, storage_multiproof_input: StorageMultiproofInput) {
386        let StorageMultiproofInput {
387            hashed_state_update,
388            hashed_address,
389            proof_targets,
390            proof_sequence_number,
391            multi_added_removed_keys,
392            state_root_message_sender: _,
393        } = storage_multiproof_input;
394
395        let storage_targets = proof_targets.len();
396
397        trace!(
398            target: "engine::tree::payload_processor::multiproof",
399            proof_sequence_number,
400            ?proof_targets,
401            storage_targets,
402            "Dispatching storage proof to workers"
403        );
404
405        let start = Instant::now();
406
407        let prefix_set = reth_trie::prefix_set::PrefixSetMut::from(
409            proof_targets.iter().map(reth_trie::Nibbles::unpack),
410        );
411        let prefix_set = prefix_set.freeze();
412
413        let input = StorageProofInput::new(
415            hashed_address,
416            prefix_set,
417            proof_targets,
418            true, Some(multi_added_removed_keys),
420        );
421
422        if let Err(e) = self.proof_worker_handle.dispatch_storage_proof(
424            input,
425            ProofResultContext::new(
426                self.proof_result_tx.clone(),
427                proof_sequence_number,
428                hashed_state_update,
429                start,
430            ),
431        ) {
432            error!(target: "engine::tree::payload_processor::multiproof", ?e, "Failed to dispatch storage proof");
433            return;
434        }
435
436        self.metrics
437            .active_storage_workers_histogram
438            .record(self.proof_worker_handle.active_storage_workers() as f64);
439        self.metrics
440            .active_account_workers_histogram
441            .record(self.proof_worker_handle.active_account_workers() as f64);
442        self.metrics
443            .pending_storage_multiproofs_histogram
444            .record(self.proof_worker_handle.pending_storage_tasks() as f64);
445        self.metrics
446            .pending_account_multiproofs_histogram
447            .record(self.proof_worker_handle.pending_account_tasks() as f64);
448    }
449
450    fn on_calculation_complete(&self) {
452        self.metrics
453            .active_storage_workers_histogram
454            .record(self.proof_worker_handle.active_storage_workers() as f64);
455        self.metrics
456            .active_account_workers_histogram
457            .record(self.proof_worker_handle.active_account_workers() as f64);
458        self.metrics
459            .pending_storage_multiproofs_histogram
460            .record(self.proof_worker_handle.pending_storage_tasks() as f64);
461        self.metrics
462            .pending_account_multiproofs_histogram
463            .record(self.proof_worker_handle.pending_account_tasks() as f64);
464    }
465
466    fn dispatch_multiproof(&self, multiproof_input: MultiproofInput) {
468        let MultiproofInput {
469            source,
470            hashed_state_update,
471            proof_targets,
472            proof_sequence_number,
473            state_root_message_sender: _,
474            multi_added_removed_keys,
475        } = multiproof_input;
476
477        let missed_leaves_storage_roots = self.missed_leaves_storage_roots.clone();
478        let account_targets = proof_targets.len();
479        let storage_targets = proof_targets.values().map(|slots| slots.len()).sum::<usize>();
480
481        trace!(
482            target: "engine::tree::payload_processor::multiproof",
483            proof_sequence_number,
484            ?proof_targets,
485            account_targets,
486            storage_targets,
487            ?source,
488            "Dispatching multiproof to workers"
489        );
490
491        let start = Instant::now();
492
493        let frozen_prefix_sets =
495            ParallelProof::extend_prefix_sets_with_targets(&Default::default(), &proof_targets);
496
497        let input = AccountMultiproofInput {
499            targets: proof_targets,
500            prefix_sets: frozen_prefix_sets,
501            collect_branch_node_masks: true,
502            multi_added_removed_keys,
503            missed_leaves_storage_roots,
504            proof_result_sender: ProofResultContext::new(
506                self.proof_result_tx.clone(),
507                proof_sequence_number,
508                hashed_state_update,
509                start,
510            ),
511        };
512
513        if let Err(e) = self.proof_worker_handle.dispatch_account_multiproof(input) {
514            error!(target: "engine::tree::payload_processor::multiproof", ?e, "Failed to dispatch account multiproof");
515            return;
516        }
517
518        self.metrics
519            .active_storage_workers_histogram
520            .record(self.proof_worker_handle.active_storage_workers() as f64);
521        self.metrics
522            .active_account_workers_histogram
523            .record(self.proof_worker_handle.active_account_workers() as f64);
524        self.metrics
525            .pending_storage_multiproofs_histogram
526            .record(self.proof_worker_handle.pending_storage_tasks() as f64);
527        self.metrics
528            .pending_account_multiproofs_histogram
529            .record(self.proof_worker_handle.pending_account_tasks() as f64);
530    }
531}
532
533#[derive(Metrics, Clone)]
534#[metrics(scope = "tree.root")]
535pub(crate) struct MultiProofTaskMetrics {
536    pub active_storage_workers_histogram: Histogram,
538    pub active_account_workers_histogram: Histogram,
540    pub max_storage_workers: Gauge,
542    pub max_account_workers: Gauge,
544    pub pending_storage_multiproofs_histogram: Histogram,
546    pub pending_account_multiproofs_histogram: Histogram,
548
549    pub prefetch_proof_targets_accounts_histogram: Histogram,
551    pub prefetch_proof_targets_storages_histogram: Histogram,
553    pub prefetch_proof_chunks_histogram: Histogram,
555
556    pub state_update_proof_targets_accounts_histogram: Histogram,
558    pub state_update_proof_targets_storages_histogram: Histogram,
560    pub state_update_proof_chunks_histogram: Histogram,
562
563    pub proof_calculation_duration_histogram: Histogram,
565
566    pub sparse_trie_update_duration_histogram: Histogram,
568    pub sparse_trie_final_update_duration_histogram: Histogram,
570    pub sparse_trie_total_duration_histogram: Histogram,
572
573    pub state_updates_received_histogram: Histogram,
575    pub proofs_processed_histogram: Histogram,
577    pub multiproof_task_total_duration_histogram: Histogram,
579    pub first_update_wait_time_histogram: Histogram,
581    pub last_proof_wait_time_histogram: Histogram,
583}
584
585#[derive(Debug)]
685pub(super) struct MultiProofTask {
686    chunk_size: Option<usize>,
689    rx: CrossbeamReceiver<MultiProofMessage>,
691    tx: CrossbeamSender<MultiProofMessage>,
693    proof_result_rx: CrossbeamReceiver<ProofResultMessage>,
695    to_sparse_trie: std::sync::mpsc::Sender<SparseTrieUpdate>,
697    fetched_proof_targets: MultiProofTargets,
699    multi_added_removed_keys: MultiAddedRemovedKeys,
701    proof_sequencer: ProofSequencer,
703    multiproof_manager: MultiproofManager,
705    metrics: MultiProofTaskMetrics,
707}
708
709impl MultiProofTask {
710    pub(super) fn new(
712        proof_worker_handle: ProofWorkerHandle,
713        to_sparse_trie: std::sync::mpsc::Sender<SparseTrieUpdate>,
714        chunk_size: Option<usize>,
715    ) -> Self {
716        let (tx, rx) = unbounded();
717        let (proof_result_tx, proof_result_rx) = unbounded();
718        let metrics = MultiProofTaskMetrics::default();
719
720        Self {
721            chunk_size,
722            rx,
723            tx,
724            proof_result_rx,
725            to_sparse_trie,
726            fetched_proof_targets: Default::default(),
727            multi_added_removed_keys: MultiAddedRemovedKeys::new(),
728            proof_sequencer: ProofSequencer::default(),
729            multiproof_manager: MultiproofManager::new(
730                metrics.clone(),
731                proof_worker_handle,
732                proof_result_tx,
733            ),
734            metrics,
735        }
736    }
737
738    pub(super) fn state_root_message_sender(&self) -> CrossbeamSender<MultiProofMessage> {
740        self.tx.clone()
741    }
742
743    #[instrument(
747        level = "debug",
748        target = "engine::tree::payload_processor::multiproof",
749        skip_all,
750        fields(accounts = targets.len(), chunks = 0)
751    )]
752    fn on_prefetch_proof(&mut self, targets: MultiProofTargets) -> u64 {
753        let proof_targets = self.get_prefetch_proof_targets(targets);
754        self.fetched_proof_targets.extend_ref(&proof_targets);
755
756        self.multi_added_removed_keys.touch_accounts(proof_targets.keys().copied());
760
761        let multi_added_removed_keys = Arc::new(self.multi_added_removed_keys.clone());
763
764        self.metrics.prefetch_proof_targets_accounts_histogram.record(proof_targets.len() as f64);
765        self.metrics
766            .prefetch_proof_targets_storages_histogram
767            .record(proof_targets.values().map(|slots| slots.len()).sum::<usize>() as f64);
768
769        let mut chunks = 0;
771
772        let should_chunk = self.multiproof_manager.proof_worker_handle.available_account_workers() >
775            1 ||
776            self.multiproof_manager.proof_worker_handle.available_storage_workers() > 1;
777
778        let mut dispatch = |proof_targets| {
779            self.multiproof_manager.dispatch(
780                MultiproofInput {
781                    source: None,
782                    hashed_state_update: Default::default(),
783                    proof_targets,
784                    proof_sequence_number: self.proof_sequencer.next_sequence(),
785                    state_root_message_sender: self.tx.clone(),
786                    multi_added_removed_keys: Some(multi_added_removed_keys.clone()),
787                }
788                .into(),
789            );
790            chunks += 1;
791        };
792
793        if should_chunk &&
794            let Some(chunk_size) = self.chunk_size &&
795            proof_targets.chunking_length() > chunk_size
796        {
797            let mut chunks = 0usize;
798            for proof_targets_chunk in proof_targets.chunks(chunk_size) {
799                dispatch(proof_targets_chunk);
800                chunks += 1;
801            }
802            tracing::Span::current().record("chunks", chunks);
803        } else {
804            dispatch(proof_targets);
805        }
806
807        self.metrics.prefetch_proof_chunks_histogram.record(chunks as f64);
808
809        chunks
810    }
811
812    fn is_done(
814        &self,
815        proofs_processed: u64,
816        state_update_proofs_requested: u64,
817        prefetch_proofs_requested: u64,
818        updates_finished: bool,
819    ) -> bool {
820        let all_proofs_processed =
821            proofs_processed >= state_update_proofs_requested + prefetch_proofs_requested;
822        let no_pending = !self.proof_sequencer.has_pending();
823        trace!(
824            target: "engine::tree::payload_processor::multiproof",
825            proofs_processed,
826            state_update_proofs_requested,
827            prefetch_proofs_requested,
828            no_pending,
829            updates_finished,
830            "Checking end condition"
831        );
832        all_proofs_processed && no_pending && updates_finished
833    }
834
835    fn get_prefetch_proof_targets(&self, mut targets: MultiProofTargets) -> MultiProofTargets {
837        let mut duplicates = 0;
841
842        targets.retain(|hashed_address, target_storage| {
844            let keep = self
845                .fetched_proof_targets
846                .get(hashed_address)
847                .is_none_or(|fetched_storage| {
849                    !target_storage.is_subset(fetched_storage)
851                });
852
853            if !keep {
854                duplicates += target_storage.len();
855            }
856
857            keep
858        });
859
860        for (hashed_address, target_storage) in targets.deref_mut() {
862            let Some(fetched_storage) = self.fetched_proof_targets.get(hashed_address) else {
863                continue
866            };
867
868            let prev_target_storage_len = target_storage.len();
869
870            target_storage.retain(|slot| !fetched_storage.contains(slot));
874
875            duplicates += prev_target_storage_len - target_storage.len();
876        }
877
878        if duplicates > 0 {
879            trace!(target: "engine::tree::payload_processor::multiproof", duplicates, "Removed duplicate prefetch proof targets");
880        }
881
882        targets
883    }
884
885    #[instrument(
889        level = "debug",
890        target = "engine::tree::payload_processor::multiproof",
891        skip(self, update),
892        fields(accounts = update.len(), chunks = 0)
893    )]
894    fn on_state_update(&mut self, source: StateChangeSource, update: EvmState) -> u64 {
895        let hashed_state_update = evm_state_to_hashed_post_state(update);
896
897        self.multi_added_removed_keys.update_with_state(&hashed_state_update);
899
900        let (fetched_state_update, not_fetched_state_update) = hashed_state_update
903            .partition_by_targets(&self.fetched_proof_targets, &self.multi_added_removed_keys);
904
905        let mut state_updates = 0;
906        if !fetched_state_update.is_empty() {
909            let _ = self.tx.send(MultiProofMessage::EmptyProof {
910                sequence_number: self.proof_sequencer.next_sequence(),
911                state: fetched_state_update,
912            });
913            state_updates += 1;
914        }
915
916        let multi_added_removed_keys = Arc::new(self.multi_added_removed_keys.clone());
918
919        let mut chunks = 0;
921
922        let mut spawned_proof_targets = MultiProofTargets::default();
923
924        let should_chunk = self.multiproof_manager.proof_worker_handle.available_account_workers() >
927            1 ||
928            self.multiproof_manager.proof_worker_handle.available_storage_workers() > 1;
929
930        let mut dispatch = |hashed_state_update| {
931            let proof_targets = get_proof_targets(
932                &hashed_state_update,
933                &self.fetched_proof_targets,
934                &multi_added_removed_keys,
935            );
936            spawned_proof_targets.extend_ref(&proof_targets);
937
938            self.multiproof_manager.dispatch(
939                MultiproofInput {
940                    source: Some(source),
941                    hashed_state_update,
942                    proof_targets,
943                    proof_sequence_number: self.proof_sequencer.next_sequence(),
944                    state_root_message_sender: self.tx.clone(),
945                    multi_added_removed_keys: Some(multi_added_removed_keys.clone()),
946                }
947                .into(),
948            );
949
950            chunks += 1;
951        };
952
953        if should_chunk &&
954            let Some(chunk_size) = self.chunk_size &&
955            not_fetched_state_update.chunking_length() > chunk_size
956        {
957            let mut chunks = 0usize;
958            for chunk in not_fetched_state_update.chunks(chunk_size) {
959                dispatch(chunk);
960                chunks += 1;
961            }
962            tracing::Span::current().record("chunks", chunks);
963        } else {
964            dispatch(not_fetched_state_update);
965        }
966
967        self.metrics
968            .state_update_proof_targets_accounts_histogram
969            .record(spawned_proof_targets.len() as f64);
970        self.metrics
971            .state_update_proof_targets_storages_histogram
972            .record(spawned_proof_targets.values().map(|slots| slots.len()).sum::<usize>() as f64);
973        self.metrics.state_update_proof_chunks_histogram.record(chunks as f64);
974
975        self.fetched_proof_targets.extend(spawned_proof_targets);
976
977        state_updates + chunks
978    }
979
980    fn on_proof(
982        &mut self,
983        sequence_number: u64,
984        update: SparseTrieUpdate,
985    ) -> Option<SparseTrieUpdate> {
986        let ready_proofs = self.proof_sequencer.add_proof(sequence_number, update);
987
988        ready_proofs
989            .into_iter()
990            .reduce(|mut acc_update, update| {
992                acc_update.extend(update);
993                acc_update
994            })
995            .filter(|proof| !proof.is_empty())
997    }
998
999    #[instrument(
1032        level = "debug",
1033        name = "MultiProofTask::run",
1034        target = "engine::tree::payload_processor::multiproof",
1035        skip_all
1036    )]
1037    pub(crate) fn run(mut self) {
1038        let mut prefetch_proofs_requested = 0;
1040        let mut state_update_proofs_requested = 0;
1041        let mut proofs_processed = 0;
1042
1043        let mut updates_finished = false;
1044
1045        let start = Instant::now();
1047
1048        let mut first_update_time = None;
1050        let mut updates_finished_time = None;
1052
1053        loop {
1054            trace!(target: "engine::tree::payload_processor::multiproof", "entering main channel receiving loop");
1055
1056            crossbeam_channel::select_biased! {
1057                recv(self.proof_result_rx) -> proof_msg => {
1058                    match proof_msg {
1059                        Ok(proof_result) => {
1060                            proofs_processed += 1;
1061
1062                            self.metrics
1063                                .proof_calculation_duration_histogram
1064                                .record(proof_result.elapsed);
1065
1066                            self.multiproof_manager.on_calculation_complete();
1067
1068                            match proof_result.result {
1070                                Ok(proof_result_data) => {
1071                                    debug!(
1072                                        target: "engine::tree::payload_processor::multiproof",
1073                                        sequence = proof_result.sequence_number,
1074                                        total_proofs = proofs_processed,
1075                                        "Processing calculated proof from worker"
1076                                    );
1077
1078                                    let update = SparseTrieUpdate {
1079                                        state: proof_result.state,
1080                                        multiproof: proof_result_data.into_multiproof(),
1081                                    };
1082
1083                                    if let Some(combined_update) =
1084                                        self.on_proof(proof_result.sequence_number, update)
1085                                    {
1086                                        let _ = self.to_sparse_trie.send(combined_update);
1087                                    }
1088                                }
1089                                Err(error) => {
1090                                    error!(target: "engine::tree::payload_processor::multiproof", ?error, "proof calculation error from worker");
1091                                    return
1092                                }
1093                            }
1094
1095                            if self.is_done(
1096                                proofs_processed,
1097                                state_update_proofs_requested,
1098                                prefetch_proofs_requested,
1099                                updates_finished,
1100                            ) {
1101                                debug!(
1102                                    target: "engine::tree::payload_processor::multiproof",
1103                                    "State updates finished and all proofs processed, ending calculation"
1104                                );
1105                                break
1106                            }
1107                        }
1108                        Err(_) => {
1109                            error!(target: "engine::tree::payload_processor::multiproof", "Proof result channel closed unexpectedly");
1110                            return
1111                        }
1112                    }
1113                },
1114                recv(self.rx) -> message => {
1115                    match message {
1116                        Ok(msg) => match msg {
1117                            MultiProofMessage::PrefetchProofs(targets) => {
1118                                trace!(target: "engine::tree::payload_processor::multiproof", "processing MultiProofMessage::PrefetchProofs");
1119
1120                                if first_update_time.is_none() {
1121                                    self.metrics
1123                                        .first_update_wait_time_histogram
1124                                        .record(start.elapsed().as_secs_f64());
1125                                    first_update_time = Some(Instant::now());
1126                                    debug!(target: "engine::tree::payload_processor::multiproof", "Started state root calculation");
1127                                }
1128
1129                                let account_targets = targets.len();
1130                                let storage_targets =
1131                                    targets.values().map(|slots| slots.len()).sum::<usize>();
1132                                prefetch_proofs_requested += self.on_prefetch_proof(targets);
1133                                debug!(
1134                                    target: "engine::tree::payload_processor::multiproof",
1135                                    account_targets,
1136                                    storage_targets,
1137                                    prefetch_proofs_requested,
1138                                    "Prefetching proofs"
1139                                );
1140                            }
1141                            MultiProofMessage::StateUpdate(source, update) => {
1142                                trace!(target: "engine::tree::payload_processor::multiproof", "processing MultiProofMessage::StateUpdate");
1143
1144                                if first_update_time.is_none() {
1145                                    self.metrics
1147                                        .first_update_wait_time_histogram
1148                                        .record(start.elapsed().as_secs_f64());
1149                                    first_update_time = Some(Instant::now());
1150                                    debug!(target: "engine::tree::payload_processor::multiproof", "Started state root calculation");
1151                                }
1152
1153                                let len = update.len();
1154                                state_update_proofs_requested += self.on_state_update(source, update);
1155                                debug!(
1156                                    target: "engine::tree::payload_processor::multiproof",
1157                                    ?source,
1158                                    len,
1159                                    ?state_update_proofs_requested,
1160                                    "Received new state update"
1161                                );
1162                            }
1163                            MultiProofMessage::FinishedStateUpdates => {
1164                                trace!(target: "engine::tree::payload_processor::multiproof", "processing MultiProofMessage::FinishedStateUpdates");
1165
1166                                updates_finished = true;
1167                                updates_finished_time = Some(Instant::now());
1168
1169                                if self.is_done(
1170                                    proofs_processed,
1171                                    state_update_proofs_requested,
1172                                    prefetch_proofs_requested,
1173                                    updates_finished,
1174                                ) {
1175                                    debug!(
1176                                        target: "engine::tree::payload_processor::multiproof",
1177                                        "State updates finished and all proofs processed, ending calculation"
1178                                    );
1179                                    break
1180                                }
1181                            }
1182                            MultiProofMessage::EmptyProof { sequence_number, state } => {
1183                                trace!(target: "engine::tree::payload_processor::multiproof", "processing MultiProofMessage::EmptyProof");
1184
1185                                proofs_processed += 1;
1186
1187                                if let Some(combined_update) = self.on_proof(
1188                                    sequence_number,
1189                                    SparseTrieUpdate { state, multiproof: Default::default() },
1190                                ) {
1191                                    let _ = self.to_sparse_trie.send(combined_update);
1192                                }
1193
1194                                if self.is_done(
1195                                    proofs_processed,
1196                                    state_update_proofs_requested,
1197                                    prefetch_proofs_requested,
1198                                    updates_finished,
1199                                ) {
1200                                    debug!(
1201                                        target: "engine::tree::payload_processor::multiproof",
1202                                        "State updates finished and all proofs processed, ending calculation"
1203                                    );
1204                                    break
1205                                }
1206                            }
1207                        },
1208                        Err(_) => {
1209                            error!(target: "engine::tree::payload_processor::multiproof", "State root related message channel closed unexpectedly");
1210                            return
1211                        }
1212                    }
1213                }
1214            }
1215        }
1216
1217        debug!(
1218            target: "engine::tree::payload_processor::multiproof",
1219            total_updates = state_update_proofs_requested,
1220            total_proofs = proofs_processed,
1221            total_time = ?first_update_time.map(|t|t.elapsed()),
1222            time_since_updates_finished = ?updates_finished_time.map(|t|t.elapsed()),
1223            "All proofs processed, ending calculation"
1224        );
1225
1226        self.metrics.state_updates_received_histogram.record(state_update_proofs_requested as f64);
1228        self.metrics.proofs_processed_histogram.record(proofs_processed as f64);
1229        if let Some(total_time) = first_update_time.map(|t| t.elapsed()) {
1230            self.metrics.multiproof_task_total_duration_histogram.record(total_time);
1231        }
1232
1233        if let Some(updates_finished_time) = updates_finished_time {
1234            self.metrics
1235                .last_proof_wait_time_histogram
1236                .record(updates_finished_time.elapsed().as_secs_f64());
1237        }
1238    }
1239}
1240
1241fn get_proof_targets(
1245    state_update: &HashedPostState,
1246    fetched_proof_targets: &MultiProofTargets,
1247    multi_added_removed_keys: &MultiAddedRemovedKeys,
1248) -> MultiProofTargets {
1249    let mut targets = MultiProofTargets::default();
1250
1251    for &hashed_address in state_update.accounts.keys() {
1253        if !fetched_proof_targets.contains_key(&hashed_address) {
1254            targets.insert(hashed_address, HashSet::default());
1255        }
1256    }
1257
1258    for (hashed_address, storage) in &state_update.storages {
1260        let fetched = fetched_proof_targets.get(hashed_address);
1261        let storage_added_removed_keys = multi_added_removed_keys.get_storage(hashed_address);
1262        let mut changed_slots = storage
1263            .storage
1264            .keys()
1265            .filter(|slot| {
1266                !fetched.is_some_and(|f| f.contains(*slot)) ||
1267                    storage_added_removed_keys.is_some_and(|k| k.is_removed(slot))
1268            })
1269            .peekable();
1270
1271        if storage.wiped && fetched.is_none() {
1273            targets.entry(*hashed_address).or_default();
1274        }
1275
1276        if changed_slots.peek().is_some() {
1277            targets.entry(*hashed_address).or_default().extend(changed_slots);
1278        }
1279    }
1280
1281    targets
1282}
1283
1284#[cfg(test)]
1285mod tests {
1286    use super::*;
1287    use alloy_primitives::map::B256Set;
1288    use reth_provider::{
1289        providers::OverlayStateProviderFactory, test_utils::create_test_provider_factory,
1290        BlockReader, DatabaseProviderFactory, PruneCheckpointReader, StageCheckpointReader,
1291        TrieReader,
1292    };
1293    use reth_trie::MultiProof;
1294    use reth_trie_parallel::proof_task::{ProofTaskCtx, ProofWorkerHandle};
1295    use revm_primitives::{B256, U256};
1296    use std::sync::OnceLock;
1297    use tokio::runtime::{Handle, Runtime};
1298
1299    fn get_test_runtime_handle() -> Handle {
1301        static TEST_RT: OnceLock<Runtime> = OnceLock::new();
1302        TEST_RT
1303            .get_or_init(|| {
1304                tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap()
1305            })
1306            .handle()
1307            .clone()
1308    }
1309
1310    fn create_test_state_root_task<F>(factory: F) -> MultiProofTask
1311    where
1312        F: DatabaseProviderFactory<
1313                Provider: BlockReader + TrieReader + StageCheckpointReader + PruneCheckpointReader,
1314            > + Clone
1315            + Send
1316            + 'static,
1317    {
1318        let rt_handle = get_test_runtime_handle();
1319        let overlay_factory = OverlayStateProviderFactory::new(factory);
1320        let task_ctx = ProofTaskCtx::new(overlay_factory, Default::default());
1321        let proof_handle = ProofWorkerHandle::new(rt_handle, task_ctx, 1, 1);
1322        let (to_sparse_trie, _receiver) = std::sync::mpsc::channel();
1323
1324        MultiProofTask::new(proof_handle, to_sparse_trie, Some(1))
1325    }
1326
1327    #[test]
1328    fn test_add_proof_in_sequence() {
1329        let mut sequencer = ProofSequencer::default();
1330        let proof1 = MultiProof::default();
1331        let proof2 = MultiProof::default();
1332        sequencer.next_sequence = 2;
1333
1334        let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1).unwrap());
1335        assert_eq!(ready.len(), 1);
1336        assert!(!sequencer.has_pending());
1337
1338        let ready = sequencer.add_proof(1, SparseTrieUpdate::from_multiproof(proof2).unwrap());
1339        assert_eq!(ready.len(), 1);
1340        assert!(!sequencer.has_pending());
1341    }
1342
1343    #[test]
1344    fn test_add_proof_out_of_order() {
1345        let mut sequencer = ProofSequencer::default();
1346        let proof1 = MultiProof::default();
1347        let proof2 = MultiProof::default();
1348        let proof3 = MultiProof::default();
1349        sequencer.next_sequence = 3;
1350
1351        let ready = sequencer.add_proof(2, SparseTrieUpdate::from_multiproof(proof3).unwrap());
1352        assert_eq!(ready.len(), 0);
1353        assert!(sequencer.has_pending());
1354
1355        let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1).unwrap());
1356        assert_eq!(ready.len(), 1);
1357        assert!(sequencer.has_pending());
1358
1359        let ready = sequencer.add_proof(1, SparseTrieUpdate::from_multiproof(proof2).unwrap());
1360        assert_eq!(ready.len(), 2);
1361        assert!(!sequencer.has_pending());
1362    }
1363
1364    #[test]
1365    fn test_add_proof_with_gaps() {
1366        let mut sequencer = ProofSequencer::default();
1367        let proof1 = MultiProof::default();
1368        let proof3 = MultiProof::default();
1369        sequencer.next_sequence = 3;
1370
1371        let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1).unwrap());
1372        assert_eq!(ready.len(), 1);
1373
1374        let ready = sequencer.add_proof(2, SparseTrieUpdate::from_multiproof(proof3).unwrap());
1375        assert_eq!(ready.len(), 0);
1376        assert!(sequencer.has_pending());
1377    }
1378
1379    #[test]
1380    fn test_add_proof_duplicate_sequence() {
1381        let mut sequencer = ProofSequencer::default();
1382        let proof1 = MultiProof::default();
1383        let proof2 = MultiProof::default();
1384
1385        let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1).unwrap());
1386        assert_eq!(ready.len(), 1);
1387
1388        let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof2).unwrap());
1389        assert_eq!(ready.len(), 0);
1390        assert!(!sequencer.has_pending());
1391    }
1392
1393    #[test]
1394    fn test_add_proof_batch_processing() {
1395        let mut sequencer = ProofSequencer::default();
1396        let proofs: Vec<_> = (0..5).map(|_| MultiProof::default()).collect();
1397        sequencer.next_sequence = 5;
1398
1399        sequencer.add_proof(4, SparseTrieUpdate::from_multiproof(proofs[4].clone()).unwrap());
1400        sequencer.add_proof(2, SparseTrieUpdate::from_multiproof(proofs[2].clone()).unwrap());
1401        sequencer.add_proof(1, SparseTrieUpdate::from_multiproof(proofs[1].clone()).unwrap());
1402        sequencer.add_proof(3, SparseTrieUpdate::from_multiproof(proofs[3].clone()).unwrap());
1403
1404        let ready =
1405            sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proofs[0].clone()).unwrap());
1406        assert_eq!(ready.len(), 5);
1407        assert!(!sequencer.has_pending());
1408    }
1409
1410    fn create_get_proof_targets_state() -> HashedPostState {
1411        let mut state = HashedPostState::default();
1412
1413        let addr1 = B256::random();
1414        let addr2 = B256::random();
1415        state.accounts.insert(addr1, Some(Default::default()));
1416        state.accounts.insert(addr2, Some(Default::default()));
1417
1418        let mut storage = HashedStorage::default();
1419        let slot1 = B256::random();
1420        let slot2 = B256::random();
1421        storage.storage.insert(slot1, U256::ZERO);
1422        storage.storage.insert(slot2, U256::from(1));
1423        state.storages.insert(addr1, storage);
1424
1425        state
1426    }
1427
1428    #[test]
1429    fn test_get_proof_targets_new_account_targets() {
1430        let state = create_get_proof_targets_state();
1431        let fetched = MultiProofTargets::default();
1432
1433        let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
1434
1435        assert_eq!(targets.len(), state.accounts.len());
1437        for addr in state.accounts.keys() {
1438            assert!(targets.contains_key(addr));
1439        }
1440    }
1441
1442    #[test]
1443    fn test_get_proof_targets_new_storage_targets() {
1444        let state = create_get_proof_targets_state();
1445        let fetched = MultiProofTargets::default();
1446
1447        let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
1448
1449        for (addr, storage) in &state.storages {
1451            assert!(targets.contains_key(addr));
1452            let target_slots = &targets[addr];
1453            assert_eq!(target_slots.len(), storage.storage.len());
1454            for slot in storage.storage.keys() {
1455                assert!(target_slots.contains(slot));
1456            }
1457        }
1458    }
1459
1460    #[test]
1461    fn test_get_proof_targets_filter_already_fetched_accounts() {
1462        let state = create_get_proof_targets_state();
1463        let mut fetched = MultiProofTargets::default();
1464
1465        let fetched_addr = state
1467            .accounts
1468            .keys()
1469            .find(|&&addr| !state.storages.contains_key(&addr))
1470            .expect("Should have an account without storage");
1471
1472        fetched.insert(*fetched_addr, HashSet::default());
1474
1475        let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
1476
1477        assert!(!targets.contains_key(fetched_addr));
1479        assert_eq!(targets.len(), state.accounts.len() - 1);
1481    }
1482
1483    #[test]
1484    fn test_get_proof_targets_filter_already_fetched_storage() {
1485        let state = create_get_proof_targets_state();
1486        let mut fetched = MultiProofTargets::default();
1487
1488        let (addr, storage) = state.storages.iter().next().unwrap();
1490        let mut fetched_slots = HashSet::default();
1491        let fetched_slot = *storage.storage.keys().next().unwrap();
1492        fetched_slots.insert(fetched_slot);
1493        fetched.insert(*addr, fetched_slots);
1494
1495        let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
1496
1497        let target_slots = &targets[addr];
1499        assert!(!target_slots.contains(&fetched_slot));
1500        assert_eq!(target_slots.len(), storage.storage.len() - 1);
1501    }
1502
1503    #[test]
1504    fn test_get_proof_targets_empty_state() {
1505        let state = HashedPostState::default();
1506        let fetched = MultiProofTargets::default();
1507
1508        let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
1509
1510        assert!(targets.is_empty());
1511    }
1512
1513    #[test]
1514    fn test_get_proof_targets_mixed_fetched_state() {
1515        let mut state = HashedPostState::default();
1516        let mut fetched = MultiProofTargets::default();
1517
1518        let addr1 = B256::random();
1519        let addr2 = B256::random();
1520        let slot1 = B256::random();
1521        let slot2 = B256::random();
1522
1523        state.accounts.insert(addr1, Some(Default::default()));
1524        state.accounts.insert(addr2, Some(Default::default()));
1525
1526        let mut storage = HashedStorage::default();
1527        storage.storage.insert(slot1, U256::ZERO);
1528        storage.storage.insert(slot2, U256::from(1));
1529        state.storages.insert(addr1, storage);
1530
1531        let mut fetched_slots = HashSet::default();
1532        fetched_slots.insert(slot1);
1533        fetched.insert(addr1, fetched_slots);
1534
1535        let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
1536
1537        assert!(targets.contains_key(&addr2));
1538        assert!(!targets[&addr1].contains(&slot1));
1539        assert!(targets[&addr1].contains(&slot2));
1540    }
1541
1542    #[test]
1543    fn test_get_proof_targets_unmodified_account_with_storage() {
1544        let mut state = HashedPostState::default();
1545        let fetched = MultiProofTargets::default();
1546
1547        let addr = B256::random();
1548        let slot1 = B256::random();
1549        let slot2 = B256::random();
1550
1551        let mut storage = HashedStorage::default();
1554        storage.storage.insert(slot1, U256::from(1));
1555        storage.storage.insert(slot2, U256::from(2));
1556        state.storages.insert(addr, storage);
1557
1558        assert!(!state.accounts.contains_key(&addr));
1559        assert!(!fetched.contains_key(&addr));
1560
1561        let targets = get_proof_targets(&state, &fetched, &MultiAddedRemovedKeys::new());
1562
1563        assert!(targets.contains_key(&addr));
1565
1566        let target_slots = &targets[&addr];
1567        assert_eq!(target_slots.len(), 2);
1568        assert!(target_slots.contains(&slot1));
1569        assert!(target_slots.contains(&slot2));
1570    }
1571
1572    #[test]
1573    fn test_get_prefetch_proof_targets_no_duplicates() {
1574        let test_provider_factory = create_test_provider_factory();
1575        let mut test_state_root_task = create_test_state_root_task(test_provider_factory);
1576
1577        let mut targets = MultiProofTargets::default();
1579        let addr1 = B256::random();
1580        let addr2 = B256::random();
1581        let slot1 = B256::random();
1582        let slot2 = B256::random();
1583        targets.insert(addr1, std::iter::once(slot1).collect());
1584        targets.insert(addr2, std::iter::once(slot2).collect());
1585
1586        let prefetch_proof_targets =
1587            test_state_root_task.get_prefetch_proof_targets(targets.clone());
1588
1589        assert_eq!(prefetch_proof_targets, targets);
1592
1593        let addr3 = B256::random();
1595        let slot3 = B256::random();
1596        test_state_root_task.fetched_proof_targets.insert(addr3, std::iter::once(slot3).collect());
1597
1598        let prefetch_proof_targets =
1599            test_state_root_task.get_prefetch_proof_targets(targets.clone());
1600
1601        assert_eq!(prefetch_proof_targets, targets);
1604    }
1605
1606    #[test]
1607    fn test_get_prefetch_proof_targets_remove_subset() {
1608        let test_provider_factory = create_test_provider_factory();
1609        let mut test_state_root_task = create_test_state_root_task(test_provider_factory);
1610
1611        let mut targets = MultiProofTargets::default();
1613        let addr1 = B256::random();
1614        let addr2 = B256::random();
1615        let slot1 = B256::random();
1616        let slot2 = B256::random();
1617        targets.insert(addr1, std::iter::once(slot1).collect());
1618        targets.insert(addr2, std::iter::once(slot2).collect());
1619
1620        test_state_root_task.fetched_proof_targets.insert(addr1, std::iter::once(slot1).collect());
1622
1623        let prefetch_proof_targets =
1624            test_state_root_task.get_prefetch_proof_targets(targets.clone());
1625
1626        assert_eq!(prefetch_proof_targets.len(), 1);
1628        assert!(!prefetch_proof_targets.contains_key(&addr1));
1629        assert!(prefetch_proof_targets.contains_key(&addr2));
1630
1631        let slot3 = B256::random();
1633        targets.get_mut(&addr1).unwrap().insert(slot3);
1634
1635        let prefetch_proof_targets =
1636            test_state_root_task.get_prefetch_proof_targets(targets.clone());
1637
1638        assert_eq!(prefetch_proof_targets.len(), 2);
1641        assert!(prefetch_proof_targets.contains_key(&addr1));
1642        assert_eq!(
1643            *prefetch_proof_targets.get(&addr1).unwrap(),
1644            std::iter::once(slot3).collect::<B256Set>()
1645        );
1646        assert!(prefetch_proof_targets.contains_key(&addr2));
1647        assert_eq!(
1648            *prefetch_proof_targets.get(&addr2).unwrap(),
1649            std::iter::once(slot2).collect::<B256Set>()
1650        );
1651    }
1652
1653    #[test]
1654    fn test_get_proof_targets_with_removed_storage_keys() {
1655        let mut state = HashedPostState::default();
1656        let mut fetched = MultiProofTargets::default();
1657        let mut multi_added_removed_keys = MultiAddedRemovedKeys::new();
1658
1659        let addr = B256::random();
1660        let slot1 = B256::random();
1661        let slot2 = B256::random();
1662
1663        state.accounts.insert(addr, Some(Default::default()));
1665
1666        let mut storage = HashedStorage::default();
1668        storage.storage.insert(slot1, U256::from(100));
1669        storage.storage.insert(slot2, U256::from(200));
1670        state.storages.insert(addr, storage);
1671
1672        let mut fetched_slots = HashSet::default();
1674        fetched_slots.insert(slot1);
1675        fetched.insert(addr, fetched_slots);
1676
1677        let mut removed_state = HashedPostState::default();
1679        let mut removed_storage = HashedStorage::default();
1680        removed_storage.storage.insert(slot1, U256::ZERO); removed_state.storages.insert(addr, removed_storage);
1682        multi_added_removed_keys.update_with_state(&removed_state);
1683
1684        let targets = get_proof_targets(&state, &fetched, &multi_added_removed_keys);
1685
1686        assert!(targets.contains_key(&addr));
1688        let target_slots = &targets[&addr];
1689        assert_eq!(target_slots.len(), 2);
1690        assert!(target_slots.contains(&slot1)); assert!(target_slots.contains(&slot2)); }
1693
1694    #[test]
1695    fn test_get_proof_targets_with_wiped_storage() {
1696        let mut state = HashedPostState::default();
1697        let fetched = MultiProofTargets::default();
1698        let multi_added_removed_keys = MultiAddedRemovedKeys::new();
1699
1700        let addr = B256::random();
1701        let slot1 = B256::random();
1702
1703        state.accounts.insert(addr, Some(Default::default()));
1705
1706        let mut storage = HashedStorage::new(true);
1708        storage.storage.insert(slot1, U256::from(100));
1709        state.storages.insert(addr, storage);
1710
1711        let targets = get_proof_targets(&state, &fetched, &multi_added_removed_keys);
1712
1713        assert!(targets.contains_key(&addr));
1715        let target_slots = &targets[&addr];
1716        assert_eq!(target_slots.len(), 1);
1717        assert!(target_slots.contains(&slot1));
1718    }
1719
1720    #[test]
1721    fn test_get_proof_targets_removed_keys_not_in_state_update() {
1722        let mut state = HashedPostState::default();
1723        let mut fetched = MultiProofTargets::default();
1724        let mut multi_added_removed_keys = MultiAddedRemovedKeys::new();
1725
1726        let addr = B256::random();
1727        let slot1 = B256::random();
1728        let slot2 = B256::random();
1729        let slot3 = B256::random();
1730
1731        state.accounts.insert(addr, Some(Default::default()));
1733
1734        let mut storage = HashedStorage::default();
1736        storage.storage.insert(slot1, U256::from(100));
1737        storage.storage.insert(slot2, U256::from(200));
1738        state.storages.insert(addr, storage);
1739
1740        let mut fetched_slots = HashSet::default();
1742        fetched_slots.insert(slot1);
1743        fetched_slots.insert(slot2);
1744        fetched_slots.insert(slot3); fetched.insert(addr, fetched_slots);
1746
1747        let mut removed_state = HashedPostState::default();
1749        let mut removed_storage = HashedStorage::default();
1750        removed_storage.storage.insert(slot3, U256::ZERO);
1751        removed_state.storages.insert(addr, removed_storage);
1752        multi_added_removed_keys.update_with_state(&removed_state);
1753
1754        let targets = get_proof_targets(&state, &fetched, &multi_added_removed_keys);
1755
1756        assert!(!targets.contains_key(&addr));
1758    }
1759}